From 24d35ab47bdec651706a221974424409d9ab036b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 14:03:38 +0100 Subject: Add new storage functions for new replication The new replication protocol will keep all the streams separate, rather than muxing multiple streams into one. --- synapse/handlers/typing.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 0eea7f8f9c..d6809862e0 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -293,6 +293,9 @@ class TypingHandler(object): rows.sort() return rows + def get_current_token(self): + return self._latest_room_serial + class TypingNotificationEventSource(object): def __init__(self, hs): -- cgit 1.5.1 From e9dd8370b0ae8ecc1ac3bf13857a230ae9f72199 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 15:56:25 +0100 Subject: Add functions to presence to support remote syncs The TCP replication protocol streams deltas of who has started or stopped syncing. This is different from the HTTP API which periodically sends the full list of users who are syncing. This commit adds support for the new TCP style of sending deltas. --- synapse/handlers/presence.py | 66 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1ede117c79..9f9257029e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -30,6 +30,7 @@ from synapse.api.constants import PresenceState from synapse.storage.presence import UserPresenceState from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.async import Linearizer from synapse.util.logcontext import preserve_fn from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -187,6 +188,7 @@ class PresenceHandler(object): # process_id to millisecond timestamp last updated. self.external_process_to_current_syncs = {} self.external_process_last_updated_ms = {} + self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") # Start a LoopingCall in 30s that fires every 5s. # The initial delay is to allow disconnected clients a chance to @@ -508,6 +510,70 @@ class PresenceHandler(object): self.external_process_last_updated_ms[process_id] = self.clock.time_msec() self.external_process_to_current_syncs[process_id] = syncing_user_ids + @defer.inlineCallbacks + def update_external_syncs_row(self, process_id, user_id, is_syncing): + """Update the syncing users for an external process as a delta. + + Args: + process_id (str): An identifier for the process the users are + syncing against. This allows synapse to process updates + as user start and stop syncing against a given process. + user_id (str): The user who has started or stopped syncing + is_syncing (bool): Whether or not the user is now syncing + """ + with (yield self.external_sync_linearizer.queue(process_id)): + prev_state = yield self.current_state_for_user(user_id) + + process_presence = self.external_process_to_current_syncs.setdefault( + process_id, set() + ) + time_now_ms = self.clock.time_msec() + + updates = [] + if is_syncing and user_id not in process_presence: + if prev_state.state == PresenceState.OFFLINE: + updates.append(prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=time_now_ms, + last_user_sync_ts=time_now_ms, + )) + else: + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + process_presence.add(user_id) + elif user_id in process_presence: + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + + if updates: + yield self._update_states(updates) + + self.external_process_last_updated_ms[process_id] = self.clock.time_msec() + + @defer.inlineCallbacks + def update_external_syncs_clear(self, process_id): + """Marks all users that had been marked as syncing by a given process + as offline. + + Used when the process has stopped/disappeared. + """ + with (yield self.external_sync_linearizer.queue(process_id)): + process_presence = self.external_process_to_current_syncs.pop( + process_id, set() + ) + prev_states = yield self.current_state_for_users(process_presence) + time_now_ms = self.clock.time_msec() + + yield self._update_states([ + prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + ) + for prev_state in prev_states.itervalues() + ]) + self.external_process_last_updated_ms.pop(process_id, None) + @defer.inlineCallbacks def current_state_for_user(self, user_id): """Get the current presence state for a user. -- cgit 1.5.1 From 63fcc42990765563867536de550d18412f172626 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Mar 2017 14:26:08 +0100 Subject: Remove user from process_presence when stops syncing --- synapse/handlers/presence.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9f9257029e..ac6d1107bf 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -546,6 +546,7 @@ class PresenceHandler(object): updates.append(prev_state.copy_and_replace( last_user_sync_ts=time_now_ms, )) + process_presence.discard(user_id) if updates: yield self._update_states(updates) -- cgit 1.5.1 From 9d0170ac6cb54f5fe86cf1a5121d688772a544f0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2017 11:36:32 +0100 Subject: Fix up presence --- synapse/handlers/presence.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ac6d1107bf..9e14760659 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -546,12 +546,14 @@ class PresenceHandler(object): updates.append(prev_state.copy_and_replace( last_user_sync_ts=time_now_ms, )) + + if not is_syncing: process_presence.discard(user_id) if updates: yield self._update_states(updates) - self.external_process_last_updated_ms[process_id] = self.clock.time_msec() + self.external_process_last_updated_ms[process_id] = time_now_ms @defer.inlineCallbacks def update_external_syncs_clear(self, process_id): -- cgit 1.5.1 From 36d2b66f90dcdfae843c4b0ee7cba1dea23486f4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2017 11:46:20 +0100 Subject: Add a timestamp to USER_SYNC command This timestamp is used to indicate when the user last sync'd --- synapse/handlers/presence.py | 14 +++++++------- synapse/replication/tcp/commands.py | 15 ++++++++++----- synapse/replication/tcp/protocol.py | 7 +++++-- synapse/replication/tcp/resource.py | 4 ++-- 4 files changed, 24 insertions(+), 16 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9e14760659..53baf3e79a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -511,7 +511,7 @@ class PresenceHandler(object): self.external_process_to_current_syncs[process_id] = syncing_user_ids @defer.inlineCallbacks - def update_external_syncs_row(self, process_id, user_id, is_syncing): + def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec): """Update the syncing users for an external process as a delta. Args: @@ -520,6 +520,7 @@ class PresenceHandler(object): as user start and stop syncing against a given process. user_id (str): The user who has started or stopped syncing is_syncing (bool): Whether or not the user is now syncing + sync_time_msec(int): Time in ms when the user was last syncing """ with (yield self.external_sync_linearizer.queue(process_id)): prev_state = yield self.current_state_for_user(user_id) @@ -527,24 +528,23 @@ class PresenceHandler(object): process_presence = self.external_process_to_current_syncs.setdefault( process_id, set() ) - time_now_ms = self.clock.time_msec() updates = [] if is_syncing and user_id not in process_presence: if prev_state.state == PresenceState.OFFLINE: updates.append(prev_state.copy_and_replace( state=PresenceState.ONLINE, - last_active_ts=time_now_ms, - last_user_sync_ts=time_now_ms, + last_active_ts=sync_time_msec, + last_user_sync_ts=sync_time_msec, )) else: updates.append(prev_state.copy_and_replace( - last_user_sync_ts=time_now_ms, + last_user_sync_ts=sync_time_msec, )) process_presence.add(user_id) elif user_id in process_presence: updates.append(prev_state.copy_and_replace( - last_user_sync_ts=time_now_ms, + last_user_sync_ts=sync_time_msec, )) if not is_syncing: @@ -553,7 +553,7 @@ class PresenceHandler(object): if updates: yield self._update_states(updates) - self.external_process_last_updated_ms[process_id] = time_now_ms + self.external_process_last_updated_ms[process_id] = self.clock.time_msec() @defer.inlineCallbacks def update_external_syncs_clear(self, process_id): diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 68165cf2dc..84d2a2272a 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -189,29 +189,34 @@ class UserSyncCommand(Command): """Sent by the client to inform the server that a user has started or stopped syncing. Used to calculate presence on the master. + Includes a timestamp of when the last user sync was. + Format:: - USER_SYNC + USER_SYNC Where is either "start" or "stop" """ NAME = "USER_SYNC" - def __init__(self, user_id, is_syncing): + def __init__(self, user_id, is_syncing, last_sync_ms): self.user_id = user_id self.is_syncing = is_syncing + self.last_sync_ms = last_sync_ms @classmethod def from_line(cls, line): - user_id, state = line.split(" ", 1) + user_id, state, last_sync_ms = line.split(" ", 2) if state not in ("start", "end"): raise Exception("Invalid USER_SYNC state %r" % (state,)) - return cls(user_id, state == "start") + return cls(user_id, state == "start", int(last_sync_ms)) def to_line(self): - return " ".join((self.user_id, "start" if self.is_syncing else "end")) + return " ".join(( + self.user_id, "start" if self.is_syncing else "end", str(self.last_sync_ms), + )) class FederationAckCommand(Command): diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index c1dc91bdb7..80f732b455 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -368,7 +368,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): self.name = cmd.data def on_USER_SYNC(self, cmd): - self.streamer.on_user_sync(self.conn_id, cmd.user_id, cmd.is_syncing) + self.streamer.on_user_sync( + self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms, + ) def on_REPLICATE(self, cmd): stream_name = cmd.stream_name @@ -481,8 +483,9 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): # Tell the server if we have any users currently syncing (should only # happen on synchrotrons) currently_syncing = self.handler.get_currently_syncing_users() + now = self.clock.time_msec() for user_id in currently_syncing: - self.send_command(UserSyncCommand(user_id, True)) + self.send_command(UserSyncCommand(user_id, True, now)) # We've now finished connecting to so inform the client handler self.handler.update_connection(self) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index d5da0496a8..243a81d488 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -220,12 +220,12 @@ class ReplicationStreamer(object): self.federation_sender.federation_ack(token) @measure_func("repl.on_user_sync") - def on_user_sync(self, conn_id, user_id, is_syncing): + def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms): """A client has started/stopped syncing on a worker. """ user_sync_counter.inc() self.presence_handler.update_external_syncs_row( - conn_id, user_id, is_syncing + conn_id, user_id, is_syncing, last_sync_ms, ) @measure_func("repl.on_remove_pusher") -- cgit 1.5.1 From 0b08c48fc5269a04325791c96ddd389a7cfe502a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Apr 2017 15:43:37 +0100 Subject: Remove more spurious `PreserveLoggingContext`s Remove `PreserveLoggingContext` around calls to `Notifier.on_new_room_event`; there is no problem if the logcontext is set when calling it. --- synapse/handlers/federation.py | 43 +++++++++++++++++++----------------------- 1 file changed, 19 insertions(+), 24 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 888dd01240..737e2f7160 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -28,7 +28,7 @@ from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError from synapse.util.logcontext import ( - PreserveLoggingContext, preserve_fn, preserve_context_over_deferred + preserve_fn, preserve_context_over_deferred ) from synapse.util.metrics import measure_func from synapse.util.logutils import log_function @@ -394,11 +394,10 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) if event.type == EventTypes.Member: if event.membership == Membership.JOIN: @@ -916,11 +915,10 @@ class FederationHandler(BaseHandler): origin, auth_chain, state, event ) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[joinee] - ) + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=[joinee] + ) logger.debug("Finished joining %s to %s", joinee, room_id) finally: @@ -1025,10 +1023,9 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, extra_users=extra_users - ) + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, extra_users=extra_users + ) if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: @@ -1074,11 +1071,10 @@ class FederationHandler(BaseHandler): ) target_user = UserID.from_string(event.state_key) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[target_user], - ) + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=[target_user], + ) defer.returnValue(event) @@ -1236,10 +1232,9 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, extra_users=extra_users - ) + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, extra_users=extra_users + ) defer.returnValue(None) -- cgit 1.5.1 From 7eb9f34cc3c2845a0ef35c9524f7ecc14339f7c1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Apr 2017 15:44:19 +0100 Subject: Remove spurious yield In `MessageHandler`, remove `yield` on call to `Notifier.on_new_room_event`: it doesn't return anything anyway. --- synapse/handlers/message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7a498af5a2..348056add5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -612,7 +612,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _notify(): yield run_on_reactor() - yield self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) -- cgit 1.5.1 From 96b9b6c1275087acf57faf1306bcc392dbd9f842 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Apr 2017 11:34:20 +0100 Subject: Don't double json encode typing replication data --- synapse/app/synchrotron.py | 4 +--- synapse/handlers/typing.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index a1ef5dfa77..1fac021ea9 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -62,7 +62,6 @@ import sys import logging import contextlib import gc -import ujson as json logger = logging.getLogger("synapse.app.synchrotron") @@ -215,9 +214,8 @@ class SynchrotronTyping(object): self._latest_room_serial = token for row in rows: - typing = json.loads(row.user_ids) self._room_serials[row.room_id] = token - self._room_typing[row.room_id] = typing + self._room_typing[row.room_id] = row.user_ids class SynchrotronApplicationService(object): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index d6809862e0..3b7818af5c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -24,7 +24,6 @@ from synapse.types import UserID, get_domain_from_id import logging from collections import namedtuple -import ujson as json logger = logging.getLogger(__name__) @@ -288,8 +287,7 @@ class TypingHandler(object): for room_id, serial in self._room_serials.items(): if last_id < serial and serial <= current_id: typing = self._room_typing[room_id] - typing_bytes = json.dumps(list(typing), ensure_ascii=False) - rows.append((serial, room_id, typing_bytes)) + rows.append((serial, room_id, list(typing))) rows.sort() return rows -- cgit 1.5.1 From 877c029c16c41b29efde120af7894f5ace43c8fd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Apr 2017 15:51:12 +0100 Subject: Use iteritems --- synapse/handlers/presence.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 53baf3e79a..9ed5af3cb4 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -596,14 +596,14 @@ class PresenceHandler(object): for user_id in user_ids } - missing = [user_id for user_id, state in states.items() if not state] + missing = [user_id for user_id, state in states.iteritems() if not state] if missing: # There are things not in our in memory cache. Lets pull them out of # the database. res = yield self.store.get_presence_for_users(missing) states.update(res) - missing = [user_id for user_id, state in states.items() if not state] + missing = [user_id for user_id, state in states.iteritems() if not state] if missing: new = { user_id: UserPresenceState.default(user_id) -- cgit 1.5.1 From 29574fd5b3537cc272a4d792669b8d5be2a92b6f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 16:48:30 +0100 Subject: Reduce federation presence replication traffic This is mainly done by moving the calculation of where to send presence updates from the presence handler to the transaction queue, so we only need to send the presence event (and not the destinations) across the replication connection. Before we were duplicating by sending the full state across once per destination. --- synapse/app/federation_sender.py | 12 ++++ synapse/app/synchrotron.py | 6 +- synapse/federation/send_queue.py | 47 ++++++-------- synapse/federation/transaction_queue.py | 99 ++++++++++++++++++++++++++--- synapse/handlers/presence.py | 54 ++++------------ synapse/replication/slave/storage/events.py | 1 + 6 files changed, 139 insertions(+), 80 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 477e16e0fa..49efb602bc 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -32,6 +32,7 @@ from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine +from synapse.storage.presence import PresenceStore from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -80,6 +81,17 @@ class FederationSenderSlaveStore( return rows[0][0] if rows else -1 + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + # This is fine since in practice nobody uses the presence list stuff... + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + get_presence_list_observers_accepted = PresenceStore.__dict__[ + "get_presence_list_observers_accepted" + ] + class FederationSenderServer(HomeServer): def get_db_conn(self, run_new_connection=True): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index d39e3161fe..7b6f82abdc 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -206,10 +206,8 @@ class SynchrotronPresence(object): @defer.inlineCallbacks def notify_from_replication(self, states, stream_id): - parties = yield self._get_interested_parties( - states, calculate_remote_hosts=False - ) - room_ids_to_states, users_to_states, _ = parties + parties = yield self._get_interested_parties(states) + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 748548bbe2..a12c18f4df 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -53,6 +53,7 @@ class FederationRemoteSendQueue(object): self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id self.presence_map = {} self.presence_changed = sorteddict() @@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object): del self.presence_changed[key] user_ids = set( - user_id for uids in self.presence_changed.values() for _, user_id in uids + user_id + for uids in self.presence_changed.itervalues() + for user_id in uids ) to_del = [ @@ -187,18 +190,14 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() - def send_presence(self, destination, states): + def send_presence(self, states): """As per TransactionQueue""" pos = self._next_pos() - self.presence_map.update({ - state.user_id: state - for state in states - }) + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - self.presence_changed[pos] = [ - (destination, state.user_id) for state in states - ] + self.presence_map.update({state.user_id: state for state in local_states}) + self.presence_changed[pos] = [state.user_id for state in local_states] self.notifier.on_new_replication_data() @@ -251,15 +250,14 @@ class FederationRemoteSendQueue(object): keys = self.presence_changed.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - dest_user_ids = set( - (pos, dest_user_id) + dest_user_ids = [ + (pos, user_id) for pos in keys[i:j] - for dest_user_id in self.presence_changed[pos] - ) + for user_id in self.presence_changed[pos] + ] - for (key, (dest, user_id)) in dest_user_ids: + for (key, user_id) in dest_user_ids: rows.append((key, PresenceRow( - destination=dest, state=self.presence_map[user_id], ))) @@ -354,7 +352,6 @@ class BaseFederationRow(object): class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( - "destination", # str "state", # UserPresenceState ))): TypeId = "p" @@ -362,18 +359,14 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( @staticmethod def from_data(data): return PresenceRow( - destination=data["destination"], - state=UserPresenceState.from_dict(data["state"]) + state=UserPresenceState.from_dict(data) ) def to_data(self): - return { - "destination": self.destination, - "state": self.state.as_dict() - } + return self.state.as_dict() def add_to_buffer(self, buff): - buff.presence.setdefault(self.destination, []).append(self.state) + buff.presence.append(self.state) class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( @@ -469,7 +462,7 @@ TypeToRow = { ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( - "presence", # dict of destination -> [UserPresenceState] + "presence", # list(UserPresenceState) "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] "failures", # dict of destination -> [failures] @@ -491,7 +484,7 @@ def process_rows_for_federation(transaction_queue, rows): # them into the appropriate collection and then send them off. buff = ParsedFederationStreamData( - presence={}, + presence=[], keyed_edus={}, edus={}, failures={}, @@ -508,8 +501,8 @@ def process_rows_for_federation(transaction_queue, rows): parsed_row = RowType.from_data(row.data) parsed_row.add_to_buffer(buff) - for destination, states in buff.presence.iteritems(): - transaction_queue.send_presence(destination, states) + if buff.presence: + transaction_queue.send_presence(buff.presence) for destination, edu_map in buff.keyed_edus.iteritems(): for key, edu in edu_map.items(): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c27ce7c5f3..fd9e1fa01c 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -21,7 +21,7 @@ from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor -from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id @@ -78,6 +78,7 @@ class TransactionQueue(object): self.pending_edus_by_dest = edus = {} # Presence needs to be separate as we send single aggragate EDUs + self.pending_presence = {} self.pending_presence_by_dest = presence = {} self.pending_edus_keyed_by_dest = edus_keyed = {} @@ -113,6 +114,8 @@ class TransactionQueue(object): self._is_processing = False self._last_poked_id = -1 + self._processing_pending_presence = False + def can_send_to(self, destination): """Can we send messages to the given server? @@ -224,17 +227,95 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def send_presence(self, destination, states): - if not self.can_send_to(destination): + @preserve_fn + @defer.inlineCallbacks + def send_presence(self, states): + """Send the new presence states to the appropriate destinations. + + Args: + states (list(UserPresenceState)) + """ + + # First we queue up the new presence by user ID, so multiple presence + # updates in quick successtion are correctly handled + self.pending_presence.update({state.user_id: state for state in states}) + + # We then handle the new pending presence in batches, first figuring + # out the destinations we need to send each state to and then poking it + # to attempt a new transaction. We linearize this so that we don't + # accidentally mess up the ordering and send multiple presence updates + # in the wrong order + if self._processing_pending_presence: return - self.pending_presence_by_dest.setdefault(destination, {}).update({ - state.user_id: state for state in states - }) + self._processing_pending_presence = True + try: + while True: + states = self.pending_presence + self.pending_presence = {} - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + if not states: + break + + yield self._process_presence_inner(states) + finally: + self._processing_pending_presence = False + + @measure_func("txnqueue._process_presence") + @defer.inlineCallbacks + def _process_presence_inner(self, states): + """Given a list of states populate self.pending_presence_by_dest and + poke to send a new transaction to each destination + + Args: + states (list(UserPresenceState)) + """ + # First we look up the rooms each user is in (as well as any explicit + # subscriptions), then for each distinct room we look up the remote + # hosts in those rooms. + room_ids_to_states = {} + users_to_states = {} + for state in states.itervalues(): + room_ids = yield self.store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) + + plist = yield self.store.get_presence_list_observers_accepted( + state.user_id, + ) + for u in plist: + users_to_states.setdefault(u, []).append(state) + + hosts_and_states = [] + for room_id, states in room_ids_to_states.items(): + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + if not local_states: + continue + + hosts = yield self.store.get_hosts_in_room(room_id) + hosts_and_states.append((hosts, local_states)) + + for user_id, states in users_to_states.items(): + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + if not local_states: + continue + + host = get_domain_from_id(user_id) + hosts_and_states.append(([host], local_states)) + + # And now finally queue up new transactions + for destinations, states in hosts_and_states: + for destination in destinations: + if not self.can_send_to(destination): + continue + + self.pending_presence_by_dest.setdefault( + destination, {} + ).update({ + state.user_id: state for state in states + }) + + preserve_fn(self._attempt_new_transaction)(destination) def send_edu(self, destination, edu_type, content, key=None): edu = Edu( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9ed5af3cb4..c1c0dd4d3d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -318,11 +318,7 @@ class PresenceHandler(object): if to_federation_ping: federation_presence_out_counter.inc_by(len(to_federation_ping)) - _, _, hosts_to_states = yield self._get_interested_parties( - to_federation_ping.values() - ) - - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(to_federation_ping.values()) def _handle_timeouts(self): """Checks the presence of users that have timed out and updates as @@ -615,12 +611,12 @@ class PresenceHandler(object): defer.returnValue(states) @defer.inlineCallbacks - def _get_interested_parties(self, states, calculate_remote_hosts=True): + def _get_interested_parties(self, states): """Given a list of states return which entities (rooms, users, servers) are interested in the given states. Returns: - 3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`, + 2-tuple: `(room_ids_to_states, users_to_states)`, with each item being a dict of `entity_name` -> `[UserPresenceState]` """ room_ids_to_states = {} @@ -637,30 +633,10 @@ class PresenceHandler(object): # Always notify self users_to_states.setdefault(state.user_id, []).append(state) - hosts_to_states = {} - if calculate_remote_hosts: - for room_id, states in room_ids_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - hosts = yield self.store.get_hosts_in_room(room_id) - - for host in hosts: - hosts_to_states.setdefault(host, []).extend(local_states) - - for user_id, states in users_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - host = get_domain_from_id(user_id) - hosts_to_states.setdefault(host, []).extend(local_states) - # TODO: de-dup hosts_to_states, as a single host might have multiple # of same presence - defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states)) + defer.returnValue((room_ids_to_states, users_to_states)) @defer.inlineCallbacks def _persist_and_notify(self, states): @@ -670,33 +646,32 @@ class PresenceHandler(object): stream_id, max_token = yield self.store.update_presence(states) parties = yield self._get_interested_parties(states) - room_ids_to_states, users_to_states, hosts_to_states = parties + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states.keys()] + users=[UserID.from_string(u) for u in users_to_states] ) - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(states) @defer.inlineCallbacks def notify_for_states(self, state, stream_id): parties = yield self._get_interested_parties([state]) - room_ids_to_states, users_to_states, hosts_to_states = parties + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states.keys()] + users=[UserID.from_string(u) for u in users_to_states] ) - def _push_to_remotes(self, hosts_to_states): + def _push_to_remotes(self, states): """Sends state updates to remote servers. Args: - hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]` + hosts_to_states (list): list(state) """ - for host, states in hosts_to_states.items(): - self.federation.send_presence(host, states) + self.federation.send_presence(states) @defer.inlineCallbacks def incoming_presence(self, origin, content): @@ -837,14 +812,13 @@ class PresenceHandler(object): if self.is_mine(user): state = yield self.current_state_for_user(user.to_string()) - hosts = set(get_domain_from_id(u) for u in user_ids) - self._push_to_remotes({host: (state,) for host in hosts}) + self._push_to_remotes([state]) else: user_ids = filter(self.is_mine_id, user_ids) states = yield self.current_state_for_users(user_ids) - self._push_to_remotes({user.domain: states.values()}) + self._push_to_remotes(states.values()) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 5fd47706ef..4ca1e5aa8c 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -71,6 +71,7 @@ class SlavedEventStore(BaseSlavedStore): # to reach inside the __dict__ to extract them. get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"] get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"] + get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"] get_users_who_share_room_with_user = ( RoomMemberStore.__dict__["get_users_who_share_room_with_user"] ) -- cgit 1.5.1 From e263c26690685b713ab8a5f4fdb7b9e92a7b8d99 Mon Sep 17 00:00:00 2001 From: lukebarnard Date: Tue, 11 Apr 2017 11:55:30 +0100 Subject: Initial commit of RM server-side impl (See https://docs.google.com/document/d/1UWqdS-e1sdwkLDUY0wA4gZyIkRp-ekjsLZ8k6g_Zvso/edit#heading=h.lndohpg8at5u) --- synapse/handlers/read_marker.py | 82 +++++++++++++++++++++++++++++ synapse/rest/client/v2_alpha/read_marker.py | 71 +++++++++++++++++++++++++ 2 files changed, 153 insertions(+) create mode 100644 synapse/handlers/read_marker.py create mode 100644 synapse/rest/client/v2_alpha/read_marker.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py new file mode 100644 index 0000000000..77164f3f49 --- /dev/null +++ b/synapse/handlers/read_marker.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseHandler + +from twisted.internet import defer + +from synapse.util.logcontext import PreserveLoggingContext +from synapse.types import get_domain_from_id + +import logging + + +logger = logging.getLogger(__name__) + + +class ReadMarkerHandler(BaseHandler): + def __init__(self, hs): + super(ReadMarkerHandler, self).__init__(hs) + + self.server_name = hs.config.server_name + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def received_client_read_marker(self, room_id, user_id, event_id): + """NEEDS DOC + """ + + room_id = read_marker["room_id"] + user_id = read_marker["user_id"] + event_id = read_marker["event_id"] + + # Get ordering for existing read marker + account_data = yield self.store.get_account_data_for_room(user_id, room_id) + existing_read_marker = account_data["m.read_marker"] + + if existing_read_marker: + # Get ordering for new read marker + res = self.store._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + allow_none=True + ) + new_to = int(res["topological_ordering"]) if res else None + new_so = int(res["stream_ordering"]) if res else None + + res = self.store._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": existing_read_marker.content.marker}, + allow_none=True + ) + existing_to = int(res["topological_ordering"]) if res else None + existing_so = int(res["stream_ordering"]) if res else None + + if new_to > existing_to: + return False + elif new_to == existing_to and new_so >= existing_so: + return False + + # Update account data + content = { + "marker": event_id + } + yield self.store.add_account_data_to_room( + user_id, room_id, "m.read_marker", content + ) diff --git a/synapse/rest/client/v2_alpha/read_marker.py b/synapse/rest/client/v2_alpha/read_marker.py new file mode 100644 index 0000000000..02408eaf10 --- /dev/null +++ b/synapse/rest/client/v2_alpha/read_marker.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.errors import SynapseError +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from ._base import client_v2_patterns + +import logging + + +logger = logging.getLogger(__name__) + + +class ReceiptRestServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/rooms/(?P[^/]*)" + "/read_marker$" + ) + + def __init__(self, hs): + super(ReceiptRestServlet, self).__init__() + self.hs = hs + self.auth = hs.get_auth() + self.receipts_handler = hs.get_receipts_handler() + self.read_marker_handler = hs.get_read_marker_handler() + self.presence_handler = hs.get_presence_handler() + + @defer.inlineCallbacks + def on_POST(self, request, room_id, receipt_type, event_id): + requester = yield self.auth.get_user_by_req(request) + + yield self.presence_handler.bump_presence_active_time(requester.user) + + body = parse_json_object_from_request(request) + + if "m.read" in body: + read_event_id = body["m.read"]; + yield self.receipts_handler.received_client_receipt( + room_id, + "m.read", + user_id=requester.user.to_string(), + event_id=read_event_id + ) + + if "m.read_marker" in body: + read_marker_event_id = body["m.read_marker"] + yield self.read_marker_handler.received_client_read_marker( + room_id, + user_id=requester.user.to_string(), + event_id=read_marker_event_id + ) + + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + ReceiptRestServlet(hs).register(http_server) -- cgit 1.5.1 From d892079844eec9dd7855ee66f3ad3225df4bdbc0 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 11 Apr 2017 15:01:39 +0100 Subject: Finish implementing RM endpoint - This change causes a 405 to be sent if "m.read_marker" is set via /account_data - This also fixes-up the RM endpoint so that it actually Works. --- synapse/handlers/read_marker.py | 87 +++++++++++++++------------- synapse/rest/__init__.py | 2 + synapse/rest/client/v2_alpha/account_data.py | 8 ++- synapse/rest/client/v2_alpha/read_marker.py | 13 ++--- synapse/server.py | 5 ++ 5 files changed, 67 insertions(+), 48 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 77164f3f49..021faff376 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -18,65 +18,74 @@ from ._base import BaseHandler from twisted.internet import defer from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.async import Linearizer from synapse.types import get_domain_from_id +from synapse.api.errors import SynapseError import logging - - logger = logging.getLogger(__name__) - class ReadMarkerHandler(BaseHandler): def __init__(self, hs): super(ReadMarkerHandler, self).__init__(hs) - self.server_name = hs.config.server_name self.store = hs.get_datastore() + self.read_marker_linearizer = Linearizer(name="read_marker") + self.notifier = hs.get_notifier() @defer.inlineCallbacks def received_client_read_marker(self, room_id, user_id, event_id): - """NEEDS DOC - """ + """Updates the read marker for a given user in a given room if the event ID given + is ahead in the stream relative to the current read marker. - room_id = read_marker["room_id"] - user_id = read_marker["user_id"] - event_id = read_marker["event_id"] + This uses a notifier to indicate that account data should be sent down /sync if + the read marker has changed. + """ # Get ordering for existing read marker - account_data = yield self.store.get_account_data_for_room(user_id, room_id) - existing_read_marker = account_data["m.read_marker"] + with (yield self.read_marker_linearizer.queue(room_id + "_" + user_id)): + account_data = yield self.store.get_account_data_for_room(user_id, room_id) + existing_read_marker = account_data["m.read_marker"] - if existing_read_marker: - # Get ordering for new read marker - res = self.store._simple_select_one_txn( - txn, - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - allow_none=True - ) - new_to = int(res["topological_ordering"]) if res else None - new_so = int(res["stream_ordering"]) if res else None + should_update = True - res = self.store._simple_select_one_txn( - txn, + res = yield self.store._simple_select_one( table="events", retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": existing_read_marker.content.marker}, + keyvalues={"event_id": event_id}, allow_none=True ) - existing_to = int(res["topological_ordering"]) if res else None - existing_so = int(res["stream_ordering"]) if res else None - - if new_to > existing_to: - return False - elif new_to == existing_to and new_so >= existing_so: - return False - # Update account data - content = { - "marker": event_id - } - yield self.store.add_account_data_to_room( - user_id, room_id, "m.read_marker", content - ) + if not res: + raise SynapseError(404, 'Event does not exist') + + if existing_read_marker: + new_to = int(res["topological_ordering"]) + new_so = int(res["stream_ordering"]) + + # Get ordering for existing read marker + res = yield self.store._simple_select_one( + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": existing_read_marker['marker']}, + allow_none=True + ) + existing_to = int(res["topological_ordering"]) if res else None + existing_so = int(res["stream_ordering"]) if res else None + + # Prevent updating if the existing marker is ahead in the stream + if existing_to > new_to: + should_update = False + elif existing_to == new_to and existing_so >= new_so: + should_update = False + + if should_update: + content = { + "marker": event_id + } + max_id = yield self.store.add_account_data_to_room( + user_id, room_id, "m.read_marker", content + ) + self.notifier.on_new_event( + "account_data_key", max_id, users=[user_id], rooms=[room_id] + ) diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index f9f5a3e077..aa8d874f96 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -40,6 +40,7 @@ from synapse.rest.client.v2_alpha import ( register, auth, receipts, + read_marker, keys, tokenrefresh, tags, @@ -88,6 +89,7 @@ class ClientRestResource(JsonResource): register.register_servlets(hs, client_resource) auth.register_servlets(hs, client_resource) receipts.register_servlets(hs, client_resource) + read_marker.register_servlets(hs, client_resource) keys.register_servlets(hs, client_resource) tokenrefresh.register_servlets(hs, client_resource) tags.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py index b16079cece..a846ab1dc1 100644 --- a/synapse/rest/client/v2_alpha/account_data.py +++ b/synapse/rest/client/v2_alpha/account_data.py @@ -16,7 +16,7 @@ from ._base import client_v2_patterns from synapse.http.servlet import RestServlet, parse_json_object_from_request -from synapse.api.errors import AuthError +from synapse.api.errors import AuthError, SynapseError from twisted.internet import defer @@ -82,6 +82,12 @@ class RoomAccountDataServlet(RestServlet): body = parse_json_object_from_request(request) + if account_data_type == "m.read_marker": + raise SynapseError(405, + "Cannot set m.read_marker through this API. " + "Use /rooms/!roomId:server.name/read_marker" + ) + max_id = yield self.store.add_account_data_to_room( user_id, room_id, account_data_type, body ) diff --git a/synapse/rest/client/v2_alpha/read_marker.py b/synapse/rest/client/v2_alpha/read_marker.py index 02408eaf10..49ada9c047 100644 --- a/synapse/rest/client/v2_alpha/read_marker.py +++ b/synapse/rest/client/v2_alpha/read_marker.py @@ -25,14 +25,11 @@ import logging logger = logging.getLogger(__name__) -class ReceiptRestServlet(RestServlet): - PATTERNS = client_v2_patterns( - "/rooms/(?P[^/]*)" - "/read_marker$" - ) +class ReadMarkerRestServlet(RestServlet): + PATTERNS = client_v2_patterns("/rooms/(?P[^/]*)/read_marker$") def __init__(self, hs): - super(ReceiptRestServlet, self).__init__() + super(ReadMarkerRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() self.receipts_handler = hs.get_receipts_handler() @@ -40,7 +37,7 @@ class ReceiptRestServlet(RestServlet): self.presence_handler = hs.get_presence_handler() @defer.inlineCallbacks - def on_POST(self, request, room_id, receipt_type, event_id): + def on_POST(self, request, room_id): requester = yield self.auth.get_user_by_req(request) yield self.presence_handler.bump_presence_active_time(requester.user) @@ -68,4 +65,4 @@ class ReceiptRestServlet(RestServlet): def register_servlets(hs, http_server): - ReceiptRestServlet(hs).register(http_server) + ReadMarkerRestServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index 6310152560..12754c89ae 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -48,6 +48,7 @@ from synapse.handlers.typing import TypingHandler from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.receipts import ReceiptsHandler +from synapse.handlers.read_marker import ReadMarkerHandler from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.notifier import Notifier @@ -133,6 +134,7 @@ class HomeServer(object): 'receipts_handler', 'macaroon_generator', 'tcp_replication', + 'read_marker_handler', ] def __init__(self, hostname, **kwargs): @@ -291,6 +293,9 @@ class HomeServer(object): def build_receipts_handler(self): return ReceiptsHandler(self) + def build_read_marker_handler(self): + return ReadMarkerHandler(self) + def build_tcp_replication(self): raise NotImplementedError() -- cgit 1.5.1 From 6308ac45b08ff5bf7259f09a2a767212f3f97860 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 15:19:26 +0100 Subject: Move get_interested_remotes back to presence handler --- synapse/federation/transaction_queue.py | 41 ++++---------------------- synapse/handlers/presence.py | 52 +++++++++++++++++++++++++++++++-- 2 files changed, 55 insertions(+), 38 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index eb361d904c..08ceda31a6 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -25,7 +25,7 @@ from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id -from synapse.handlers.presence import format_user_presence_state +from synapse.handlers.presence import format_user_presence_state, get_interested_remotes import synapse.metrics import logging @@ -251,7 +251,10 @@ class TransactionQueue(object): # First we queue up the new presence by user ID, so multiple presence # updates in quick successtion are correctly handled - self.pending_presence.update({state.user_id: state for state in states}) + self.pending_presence.update({ + state.user_id: state for state in states + if self.is_mine_id(state.user_id) + }) # We then handle the new pending presence in batches, first figuring # out the destinations we need to send each state to and then poking it @@ -283,40 +286,8 @@ class TransactionQueue(object): Args: states (list(UserPresenceState)) """ - # First we look up the rooms each user is in (as well as any explicit - # subscriptions), then for each distinct room we look up the remote - # hosts in those rooms. - room_ids_to_states = {} - users_to_states = {} - for state in states.itervalues(): - room_ids = yield self.store.get_rooms_for_user(state.user_id) - for room_id in room_ids: - room_ids_to_states.setdefault(room_id, []).append(state) - - plist = yield self.store.get_presence_list_observers_accepted( - state.user_id, - ) - for u in plist: - users_to_states.setdefault(u, []).append(state) - - hosts_and_states = [] - for room_id, states in room_ids_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - hosts = yield self.store.get_hosts_in_room(room_id) - hosts_and_states.append((hosts, local_states)) - - for user_id, states in users_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - host = get_domain_from_id(user_id) - hosts_and_states.append(([host], local_states)) + hosts_and_states = yield get_interested_remotes(self.store, states) - # And now finally queue up new transactions for destinations, states in hosts_and_states: for destination in destinations: if not self.can_send_to(destination): diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index c1c0dd4d3d..685373ff28 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -633,9 +633,6 @@ class PresenceHandler(object): # Always notify self users_to_states.setdefault(state.user_id, []).append(state) - # TODO: de-dup hosts_to_states, as a single host might have multiple - # of same presence - defer.returnValue((room_ids_to_states, users_to_states)) @defer.inlineCallbacks @@ -1318,3 +1315,52 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now): persist_and_notify = True return new_state, persist_and_notify, federation_ping + +@defer.inlineCallbacks +def get_interested_remotes(store, states): + """Given a list of presence states figure out which remote servers + should be sent which. + + All the presence states should be for local users only. + + Args: + store (DataStore) + states (list(UserPresenceState)) + + Returns: + Deferred list of ([destinations], [UserPresenceState]), where for + each row the list of UserPresenceState should be sent to each + destination + """ + # First we look up the rooms each user is in (as well as any explicit + # subscriptions), then for each distinct room we look up the remote + # hosts in those rooms. + room_ids_to_states = {} + users_to_states = {} + for state in states.itervalues(): + room_ids = yield store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) + + plist = yield store.get_presence_list_observers_accepted( + state.user_id, + ) + for u in plist: + users_to_states.setdefault(u, []).append(state) + + hosts_and_states = [] + for room_id, states in room_ids_to_states.items(): + if not local_states: + continue + + hosts = yield store.get_hosts_in_room(room_id) + hosts_and_states.append((hosts, local_states)) + + for user_id, states in users_to_states.items(): + if not local_states: + continue + + host = get_domain_from_id(user_id) + hosts_and_states.append(([host], local_states)) + + defer.returnValue(hosts_and_states) -- cgit 1.5.1 From 2be8a281d2aac8e2e3829b9aff6eb366506d22d1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 15:28:24 +0100 Subject: Comments --- synapse/federation/send_queue.py | 14 +++++++------- synapse/handlers/presence.py | 5 +++-- 2 files changed, 10 insertions(+), 9 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index a12c18f4df..7e52a55eda 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -55,17 +55,17 @@ class FederationRemoteSendQueue(object): self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id - self.presence_map = {} - self.presence_changed = sorteddict() + self.presence_map = {} # Pending presence map user_id -> UserPresenceState + self.presence_changed = sorteddict() # Stream position -> user_id - self.keyed_edu = {} - self.keyed_edu_changed = sorteddict() + self.keyed_edu = {} # (destination, key) -> EDU + self.keyed_edu_changed = sorteddict() # stream position -> (destination, key) - self.edus = sorteddict() + self.edus = sorteddict() # stream position -> Edu - self.failures = sorteddict() + self.failures = sorteddict() # stream position -> (destination, Failure) - self.device_messages = sorteddict() + self.device_messages = sorteddict() # stream position -> destination self.pos = 1 self.pos_time = sorteddict() diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 685373ff28..98e736be5b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -666,7 +666,7 @@ class PresenceHandler(object): """Sends state updates to remote servers. Args: - hosts_to_states (list): list(state) + hosts_to_states (list(UserPresenceState)) """ self.federation.send_presence(states) @@ -1332,6 +1332,8 @@ def get_interested_remotes(store, states): each row the list of UserPresenceState should be sent to each destination """ + hosts_and_states = [] # Final result to return + # First we look up the rooms each user is in (as well as any explicit # subscriptions), then for each distinct room we look up the remote # hosts in those rooms. @@ -1348,7 +1350,6 @@ def get_interested_remotes(store, states): for u in plist: users_to_states.setdefault(u, []).append(state) - hosts_and_states = [] for room_id, states in room_ids_to_states.items(): if not local_states: continue -- cgit 1.5.1 From 414522aed59aca9b711253aba98d15e2d59e14f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 15:30:02 +0100 Subject: Move get_interested_parties --- synapse/app/synchrotron.py | 5 ++-- synapse/handlers/presence.py | 69 ++++++++++++++++++++++---------------------- 2 files changed, 36 insertions(+), 38 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 7b6f82abdc..e3fbf02c9c 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -20,7 +20,7 @@ from synapse.api.constants import EventTypes from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.handlers.presence import PresenceHandler +from synapse.handlers.presence import PresenceHandler, get_interested_parties from synapse.http.site import SynapseSite from synapse.http.server import JsonResource from synapse.metrics.resource import MetricsResource, METRICS_PREFIX @@ -172,7 +172,6 @@ class SynchrotronPresence(object): get_states = PresenceHandler.get_states.__func__ get_state = PresenceHandler.get_state.__func__ - _get_interested_parties = PresenceHandler._get_interested_parties.__func__ current_state_for_users = PresenceHandler.current_state_for_users.__func__ def user_syncing(self, user_id, affect_presence): @@ -206,7 +205,7 @@ class SynchrotronPresence(object): @defer.inlineCallbacks def notify_from_replication(self, states, stream_id): - parties = yield self._get_interested_parties(states) + parties = yield get_interested_parties(self.store, states) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 98e736be5b..b9ce997a94 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -610,31 +610,6 @@ class PresenceHandler(object): defer.returnValue(states) - @defer.inlineCallbacks - def _get_interested_parties(self, states): - """Given a list of states return which entities (rooms, users, servers) - are interested in the given states. - - Returns: - 2-tuple: `(room_ids_to_states, users_to_states)`, - with each item being a dict of `entity_name` -> `[UserPresenceState]` - """ - room_ids_to_states = {} - users_to_states = {} - for state in states: - room_ids = yield self.store.get_rooms_for_user(state.user_id) - for room_id in room_ids: - room_ids_to_states.setdefault(room_id, []).append(state) - - plist = yield self.store.get_presence_list_observers_accepted(state.user_id) - for u in plist: - users_to_states.setdefault(u, []).append(state) - - # Always notify self - users_to_states.setdefault(state.user_id, []).append(state) - - defer.returnValue((room_ids_to_states, users_to_states)) - @defer.inlineCallbacks def _persist_and_notify(self, states): """Persist states in the database, poke the notifier and send to @@ -642,7 +617,7 @@ class PresenceHandler(object): """ stream_id, max_token = yield self.store.update_presence(states) - parties = yield self._get_interested_parties(states) + parties = yield get_interested_parties(self.store, states) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( @@ -654,7 +629,7 @@ class PresenceHandler(object): @defer.inlineCallbacks def notify_for_states(self, state, stream_id): - parties = yield self._get_interested_parties([state]) + parties = yield get_interested_parties(self.store, [state]) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( @@ -1316,6 +1291,36 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now): return new_state, persist_and_notify, federation_ping + +@defer.inlineCallbacks +def get_interested_parties(store, states): + """Given a list of states return which entities (rooms, users) + are interested in the given states. + + Args: + states (list(UserPresenceState)) + + Returns: + 2-tuple: `(room_ids_to_states, users_to_states)`, + with each item being a dict of `entity_name` -> `[UserPresenceState]` + """ + room_ids_to_states = {} + users_to_states = {} + for state in states: + room_ids = yield store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) + + plist = yield store.get_presence_list_observers_accepted(state.user_id) + for u in plist: + users_to_states.setdefault(u, []).append(state) + + # Always notify self + users_to_states.setdefault(state.user_id, []).append(state) + + defer.returnValue((room_ids_to_states, users_to_states)) + + @defer.inlineCallbacks def get_interested_remotes(store, states): """Given a list of presence states figure out which remote servers @@ -1351,17 +1356,11 @@ def get_interested_remotes(store, states): users_to_states.setdefault(u, []).append(state) for room_id, states in room_ids_to_states.items(): - if not local_states: - continue - hosts = yield store.get_hosts_in_room(room_id) - hosts_and_states.append((hosts, local_states)) + hosts_and_states.append((hosts, states)) for user_id, states in users_to_states.items(): - if not local_states: - continue - host = get_domain_from_id(user_id) - hosts_and_states.append(([host], local_states)) + hosts_and_states.append(([host], states)) defer.returnValue(hosts_and_states) -- cgit 1.5.1 From 012742302748a231593222beffc5ceff61966ffc Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 11 Apr 2017 17:07:07 +0100 Subject: flake8 --- synapse/handlers/read_marker.py | 3 +-- synapse/rest/client/v2_alpha/account_data.py | 3 ++- synapse/rest/client/v2_alpha/read_marker.py | 3 +-- 3 files changed, 4 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 021faff376..a97f6b9a79 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -17,14 +17,13 @@ from ._base import BaseHandler from twisted.internet import defer -from synapse.util.logcontext import PreserveLoggingContext from synapse.util.async import Linearizer -from synapse.types import get_domain_from_id from synapse.api.errors import SynapseError import logging logger = logging.getLogger(__name__) + class ReadMarkerHandler(BaseHandler): def __init__(self, hs): super(ReadMarkerHandler, self).__init__(hs) diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py index a846ab1dc1..56cd347e78 100644 --- a/synapse/rest/client/v2_alpha/account_data.py +++ b/synapse/rest/client/v2_alpha/account_data.py @@ -83,7 +83,8 @@ class RoomAccountDataServlet(RestServlet): body = parse_json_object_from_request(request) if account_data_type == "m.read_marker": - raise SynapseError(405, + raise SynapseError( + 405, "Cannot set m.read_marker through this API. " "Use /rooms/!roomId:server.name/read_marker" ) diff --git a/synapse/rest/client/v2_alpha/read_marker.py b/synapse/rest/client/v2_alpha/read_marker.py index 49ada9c047..a568a9252a 100644 --- a/synapse/rest/client/v2_alpha/read_marker.py +++ b/synapse/rest/client/v2_alpha/read_marker.py @@ -15,7 +15,6 @@ from twisted.internet import defer -from synapse.api.errors import SynapseError from synapse.http.servlet import RestServlet, parse_json_object_from_request from ._base import client_v2_patterns @@ -45,7 +44,7 @@ class ReadMarkerRestServlet(RestServlet): body = parse_json_object_from_request(request) if "m.read" in body: - read_event_id = body["m.read"]; + read_event_id = body["m.read"] yield self.receipts_handler.received_client_receipt( room_id, "m.read", -- cgit 1.5.1 From 131485ef665b4dc714a9624711c18bb67f264aca Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 11 Apr 2017 17:33:51 +0100 Subject: Copyright --- synapse/handlers/read_marker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index a97f6b9a79..19decc2c63 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. -- cgit 1.5.1 From 73880268ef7184d17ec369074d1d0d72de56f33c Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 11 Apr 2017 17:34:09 +0100 Subject: Refactor event ordering check to events store --- synapse/handlers/read_marker.py | 32 ++++---------------------------- synapse/storage/events.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 28 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 19decc2c63..c3882313e6 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -42,41 +42,17 @@ class ReadMarkerHandler(BaseHandler): """ # Get ordering for existing read marker - with (yield self.read_marker_linearizer.queue(room_id + "_" + user_id)): + with (yield self.read_marker_linearizer.queue((room_id, user_id))): account_data = yield self.store.get_account_data_for_room(user_id, room_id) existing_read_marker = account_data["m.read_marker"] should_update = True - res = yield self.store._simple_select_one( - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - allow_none=True - ) - - if not res: - raise SynapseError(404, 'Event does not exist') - if existing_read_marker: - new_to = int(res["topological_ordering"]) - new_so = int(res["stream_ordering"]) - - # Get ordering for existing read marker - res = yield self.store._simple_select_one( - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": existing_read_marker['marker']}, - allow_none=True + should_update = yield self.store.is_event_after( + existing_read_marker['marker'], + event_id ) - existing_to = int(res["topological_ordering"]) if res else None - existing_so = int(res["stream_ordering"]) if res else None - - # Prevent updating if the existing marker is ahead in the stream - if existing_to > new_to: - should_update = False - elif existing_to == new_to and existing_so >= new_so: - should_update = False if should_update: content = { diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 64fe937bdc..3c6df5c2d2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2159,6 +2159,34 @@ class EventsStore(SQLBaseStore): ] ) + @defer.inlineCallbacks + def is_event_after(self, event_id1, event_id2): + is_after = True + + to_1, so_1 = yield self._get_event_ordering(event_id1) + to_2, so_2 = yield self._get_event_ordering(event_id2) + + # Prevent updating if the existing marker is ahead in the stream + if to_1 > to_2: + is_after = False + elif to_1 == to_2 and so_1 >= so_2: + is_after = False + + defer.returnValue(is_after) + + @defer.inlineCallbacks + def _get_event_ordering(self, event_id): + res = yield self._simple_select_one( + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + allow_none=True + ) + + if not res: + raise SynapseError(404, "Could not find event %s" % (event_id,)) + + defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"]))) AllNewEventsResult = namedtuple("AllNewEventsResult", [ "new_forward_events", "new_backfill_events", -- cgit 1.5.1 From 867822fa1e588da67508bb6a094c133c48e5d129 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 11 Apr 2017 17:36:04 +0100 Subject: flake8 --- synapse/handlers/read_marker.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index c3882313e6..ee8cfc1c7a 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -18,7 +18,6 @@ from ._base import BaseHandler from twisted.internet import defer from synapse.util.async import Linearizer -from synapse.api.errors import SynapseError import logging logger = logging.getLogger(__name__) -- cgit 1.5.1 From 77fb2b72aeccfa94a47b9088c885ed103edbfc60 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Wed, 12 Apr 2017 09:47:29 +0100 Subject: Handle no previous RM --- synapse/handlers/read_marker.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index ee8cfc1c7a..43489b88bf 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -43,7 +43,10 @@ class ReadMarkerHandler(BaseHandler): # Get ordering for existing read marker with (yield self.read_marker_linearizer.queue((room_id, user_id))): account_data = yield self.store.get_account_data_for_room(user_id, room_id) - existing_read_marker = account_data["m.read_marker"] + + existing_read_marker = None + if "m.read_marker" in account_data: + existing_read_marker = account_data["m.read_marker"] should_update = True -- cgit 1.5.1 From c7ddb5ef7ac3dc7370010ee6685497ee73f46fa2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Apr 2017 10:11:43 +0100 Subject: Reuse get_interested_parties --- synapse/federation/transaction_queue.py | 6 +++--- synapse/handlers/presence.py | 21 +++++---------------- 2 files changed, 8 insertions(+), 19 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 260a472255..feb1605019 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -269,13 +269,13 @@ class TransactionQueue(object): self._processing_pending_presence = True try: while True: - states = self.pending_presence + states_map = self.pending_presence self.pending_presence = {} - if not states: + if not states_map: break - yield self._process_presence_inner(states) + yield self._process_presence_inner(states_map.values()) finally: self._processing_pending_presence = False diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b9ce997a94..f3707afcd0 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -641,7 +641,7 @@ class PresenceHandler(object): """Sends state updates to remote servers. Args: - hosts_to_states (list(UserPresenceState)) + states (list(UserPresenceState)) """ self.federation.send_presence(states) @@ -1337,29 +1337,18 @@ def get_interested_remotes(store, states): each row the list of UserPresenceState should be sent to each destination """ - hosts_and_states = [] # Final result to return + hosts_and_states = [] # First we look up the rooms each user is in (as well as any explicit # subscriptions), then for each distinct room we look up the remote # hosts in those rooms. - room_ids_to_states = {} - users_to_states = {} - for state in states.itervalues(): - room_ids = yield store.get_rooms_for_user(state.user_id) - for room_id in room_ids: - room_ids_to_states.setdefault(room_id, []).append(state) - - plist = yield store.get_presence_list_observers_accepted( - state.user_id, - ) - for u in plist: - users_to_states.setdefault(u, []).append(state) + room_ids_to_states, users_to_states = yield get_interested_parties(store, states) - for room_id, states in room_ids_to_states.items(): + for room_id, states in room_ids_to_states.iteritems(): hosts = yield store.get_hosts_in_room(room_id) hosts_and_states.append((hosts, states)) - for user_id, states in users_to_states.items(): + for user_id, states in users_to_states.iteritems(): host = get_domain_from_id(user_id) hosts_and_states.append(([host], states)) -- cgit 1.5.1 From 122cd52ce451a2c182d23bb1a1a3a33ccaf307fc Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Wed, 12 Apr 2017 10:48:32 +0100 Subject: Remove comment, simplify null-guard --- synapse/handlers/read_marker.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 43489b88bf..7f10dd8b45 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -40,13 +40,10 @@ class ReadMarkerHandler(BaseHandler): the read marker has changed. """ - # Get ordering for existing read marker with (yield self.read_marker_linearizer.queue((room_id, user_id))): account_data = yield self.store.get_account_data_for_room(user_id, room_id) - existing_read_marker = None - if "m.read_marker" in account_data: - existing_read_marker = account_data["m.read_marker"] + existing_read_marker = account_data.get("m.read_marker", None) should_update = True -- cgit 1.5.1 From 69a18514e94a8fae928e80bae17c701186686d12 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Wed, 12 Apr 2017 10:50:37 +0100 Subject: Only notify user, not entire room --- synapse/handlers/read_marker.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 7f10dd8b45..800240b8a9 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -60,6 +60,4 @@ class ReadMarkerHandler(BaseHandler): max_id = yield self.store.add_account_data_to_room( user_id, room_id, "m.read_marker", content ) - self.notifier.on_new_event( - "account_data_key", max_id, users=[user_id], rooms=[room_id] - ) + self.notifier.on_new_event("account_data_key", max_id, users=[user_id]) -- cgit 1.5.1 From b9557064bf6003a666b8fb6813dd3618fe9e48b4 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Wed, 12 Apr 2017 14:36:20 +0100 Subject: Simplify is_event_after logic --- synapse/handlers/read_marker.py | 5 +++-- synapse/storage/events.py | 13 +++---------- 2 files changed, 6 insertions(+), 12 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 800240b8a9..3f46a16b90 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -48,9 +48,10 @@ class ReadMarkerHandler(BaseHandler): should_update = True if existing_read_marker: + # Only update if the new marker is ahead in the stream should_update = yield self.store.is_event_after( - existing_read_marker['marker'], - event_id + event_id, + existing_read_marker['marker'] ) if should_update: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 702bd64b2e..221cb563d8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2161,18 +2161,11 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def is_event_after(self, event_id1, event_id2): - is_after = True - + """Returns True if event_id1 is after event_id2 in the stream + """ to_1, so_1 = yield self._get_event_ordering(event_id1) to_2, so_2 = yield self._get_event_ordering(event_id2) - - # Prevent updating if the existing marker is ahead in the stream - if to_1 > to_2: - is_after = False - elif to_1 == to_2 and so_1 >= so_2: - is_after = False - - defer.returnValue(is_after) + defer.returnValue(to_1 > to_2 and so_1 > so_2) @defer.inlineCallbacks def _get_event_ordering(self, event_id): -- cgit 1.5.1 From 3fb8784c9243da32d680b21773f35d885e8be480 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 18 Apr 2017 17:46:15 +0100 Subject: m.read_marker -> m.fully_read (#2128) Also: - change the REST endpoint to have a "S" on the end (so it's now /read_markers) - change the content of the m.read_up_to event to have the key "event_id" instead of "marker". --- synapse/handlers/read_marker.py | 8 ++++---- synapse/rest/client/v2_alpha/account_data.py | 6 +++--- synapse/rest/client/v2_alpha/read_marker.py | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 3f46a16b90..b5b0303d54 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -43,7 +43,7 @@ class ReadMarkerHandler(BaseHandler): with (yield self.read_marker_linearizer.queue((room_id, user_id))): account_data = yield self.store.get_account_data_for_room(user_id, room_id) - existing_read_marker = account_data.get("m.read_marker", None) + existing_read_marker = account_data.get("m.fully_read", None) should_update = True @@ -51,14 +51,14 @@ class ReadMarkerHandler(BaseHandler): # Only update if the new marker is ahead in the stream should_update = yield self.store.is_event_after( event_id, - existing_read_marker['marker'] + existing_read_marker['event_id'] ) if should_update: content = { - "marker": event_id + "event_id": event_id } max_id = yield self.store.add_account_data_to_room( - user_id, room_id, "m.read_marker", content + user_id, room_id, "m.fully_read", content ) self.notifier.on_new_event("account_data_key", max_id, users=[user_id]) diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py index 1f9f42e661..0e0a187efd 100644 --- a/synapse/rest/client/v2_alpha/account_data.py +++ b/synapse/rest/client/v2_alpha/account_data.py @@ -82,11 +82,11 @@ class RoomAccountDataServlet(RestServlet): body = parse_json_object_from_request(request) - if account_data_type == "m.read_marker": + if account_data_type == "m.fully_read": raise SynapseError( 405, - "Cannot set m.read_marker through this API." - " Use /rooms/!roomId:server.name/read_marker" + "Cannot set m.fully_read through this API." + " Use /rooms/!roomId:server.name/read_markers" ) max_id = yield self.store.add_account_data_to_room( diff --git a/synapse/rest/client/v2_alpha/read_marker.py b/synapse/rest/client/v2_alpha/read_marker.py index 95e8e39974..2f8784fe06 100644 --- a/synapse/rest/client/v2_alpha/read_marker.py +++ b/synapse/rest/client/v2_alpha/read_marker.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) class ReadMarkerRestServlet(RestServlet): - PATTERNS = client_v2_patterns("/rooms/(?P[^/]*)/read_marker$") + PATTERNS = client_v2_patterns("/rooms/(?P[^/]*)/read_markers$") def __init__(self, hs): super(ReadMarkerRestServlet, self).__init__() @@ -51,7 +51,7 @@ class ReadMarkerRestServlet(RestServlet): event_id=read_event_id ) - read_marker_event_id = body.get("m.read_marker", None) + read_marker_event_id = body.get("m.fully_read", None) if read_marker_event_id: yield self.read_marker_handler.received_client_read_marker( room_id, -- cgit 1.5.1 From 736b9a478478f82ac27a9a6a4899aed66b0b64a8 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 21 Apr 2017 00:51:01 +0100 Subject: Remove redundant function inline `reject_remote_invite`, which only existed to make tracing the callflow more difficult. --- synapse/handlers/room_member.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 2052d6d05f..08fc6d6783 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -139,13 +139,6 @@ class RoomMemberHandler(BaseHandler): ) yield user_joined_room(self.distributor, user, room_id) - def reject_remote_invite(self, user_id, room_id, remote_room_hosts): - return self.hs.get_handlers().federation_handler.do_remotely_reject_invite( - remote_room_hosts, - room_id, - user_id - ) - @defer.inlineCallbacks def update_membership( self, @@ -286,10 +279,12 @@ class RoomMemberHandler(BaseHandler): else: # send the rejection to the inviter's HS. remote_room_hosts = remote_room_hosts + [inviter.domain] - + fed_handler = self.hs.get_handlers().federation_handler try: - ret = yield self.reject_remote_invite( - target.to_string(), room_id, remote_room_hosts + ret = yield fed_handler.do_remotely_reject_invite( + remote_room_hosts, + room_id, + target.to_string(), ) defer.returnValue(ret) except SynapseError as e: -- cgit 1.5.1 From 838810b76af338ecd5d2a34409b4e6ca78daaddb Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 21 Apr 2017 01:22:07 +0100 Subject: Broaden the conditions for locally_rejecting invites The logic for marking invites as locally rejected was all well and good, but didn't happen when the remote server returned a 500, or wasn't reachable, or had no DNS, or whatever. Just expand the except clause to catch everything. Fixes https://github.com/matrix-org/synapse/issues/761. --- synapse/handlers/room_member.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 08fc6d6783..28b2c80a93 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -287,7 +287,13 @@ class RoomMemberHandler(BaseHandler): target.to_string(), ) defer.returnValue(ret) - except SynapseError as e: + except Exception as e: + # if we were unable to reject the exception, just mark + # it as rejected on our end and plough ahead. + # + # The 'except' clause is very broad, but we need to + # capture everything from DNS failures upwards + # logger.warn("Failed to reject invite: %s", e) yield self.store.locally_reject_invite( -- cgit 1.5.1 From 0cdb32fc43aa2edbdd60fc9f092632a5edf8b348 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 21 Apr 2017 01:24:17 +0100 Subject: Remove redundant try/except clauses The `except SynapseError` clauses were pointless because the wrapped functions would never throw a `SynapseError` (they either throw a `CodeMessageException` or a `RuntimeError`). The `except CodeMessageException` is now also pointless because the caller treats all exceptions equally, so we may as well just throw the `CodeMessageException`. --- synapse/handlers/federation.py | 34 +++++++++++----------------------- 1 file changed, 11 insertions(+), 23 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2d9126dd86..52be5a402d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1090,19 +1090,13 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def do_remotely_reject_invite(self, target_hosts, room_id, user_id): - try: - origin, event = yield self._make_and_verify_event( - target_hosts, - room_id, - user_id, - "leave" - ) - event = self._sign_event(event) - except SynapseError: - raise - except CodeMessageException as e: - logger.warn("Failed to reject invite: %s", e) - raise SynapseError(500, "Failed to reject invite") + origin, event = yield self._make_and_verify_event( + target_hosts, + room_id, + user_id, + "leave" + ) + event = self._sign_event(event) # Try the host that we succesfully called /make_leave/ on first for # the /send_leave/ request. @@ -1112,16 +1106,10 @@ class FederationHandler(BaseHandler): except ValueError: pass - try: - yield self.replication_layer.send_leave( - target_hosts, - event - ) - except SynapseError: - raise - except CodeMessageException as e: - logger.warn("Failed to reject invite: %s", e) - raise SynapseError(500, "Failed to reject invite") + yield self.replication_layer.send_leave( + target_hosts, + event + ) context = yield self.state_handler.compute_event_context(event) -- cgit 1.5.1 From a90a0f5c8a38ff7f99a12dd436b75680d3fee747 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 21 Apr 2017 11:32:48 +0100 Subject: Propagate errors sensibly from proxied IS requests When we're proxying Matrix endpoints, parse out Matrix error responses and turn them into SynapseErrors so they can be propagated sensibly upstream. --- synapse/handlers/identity.py | 10 +++++----- synapse/http/client.py | 30 ++++++++++++++++++++++++++++++ synapse/server.py | 8 +++++++- 3 files changed, 42 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 6a53c5eb47..41f978d990 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -35,7 +35,7 @@ class IdentityHandler(BaseHandler): def __init__(self, hs): super(IdentityHandler, self).__init__(hs) - self.http_client = hs.get_simple_http_client() + self.proxy_client = hs.get_matrix_proxy_client() self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers) self.trust_any_id_server_just_for_testing_do_not_use = ( @@ -83,7 +83,7 @@ class IdentityHandler(BaseHandler): data = {} try: - data = yield self.http_client.get_json( + data = yield self.proxy_client.get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/3pid/getValidated3pid" @@ -118,7 +118,7 @@ class IdentityHandler(BaseHandler): raise SynapseError(400, "No client_secret in creds") try: - data = yield self.http_client.post_urlencoded_get_json( + data = yield self.proxy_client.post_urlencoded_get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/3pid/bind" ), @@ -151,7 +151,7 @@ class IdentityHandler(BaseHandler): params.update(kwargs) try: - data = yield self.http_client.post_json_get_json( + data = yield self.proxy_client.post_json_get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/validate/email/requestToken" @@ -185,7 +185,7 @@ class IdentityHandler(BaseHandler): params.update(kwargs) try: - data = yield self.http_client.post_json_get_json( + data = yield self.proxy_client.post_json_get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/validate/msisdn/requestToken" diff --git a/synapse/http/client.py b/synapse/http/client.py index ca2f770f5d..c8b76b2191 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -145,6 +145,9 @@ class SimpleHttpClient(object): body = yield preserve_context_over_fn(readBody, response) + if response.code / 100 != 2: + raise CodeMessageException(response.code, body) + defer.returnValue(json.loads(body)) @defer.inlineCallbacks @@ -306,6 +309,33 @@ class SimpleHttpClient(object): defer.returnValue((length, headers, response.request.absoluteURI, response.code)) +class MatrixProxyClient(object): + """ + An HTTP client that proxies other Matrix endpoints, ie. if the remote endpoint + returns Matrix-style error response, this will raise the appropriate SynapseError + """ + def __init__(self, hs): + self.simpleHttpClient = SimpleHttpClient(hs) + + @defer.inlineCallbacks + def post_json_get_json(self, uri, post_json): + try: + result = yield self.simpleHttpClient.post_json_get_json(uri, post_json) + defer.returnValue(result) + except CodeMessageException as cme: + ex = None + try: + errbody = json.loads(cme.msg) + errcode = errbody['errcode'] + errtext = errbody['error'] + ex = SynapseError(cme.code, errtext, errcode) + except: + pass + if ex is not None: + raise ex + raise cme + + # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient. # The two should be factored out. diff --git a/synapse/server.py b/synapse/server.py index 6310152560..f73aef19c8 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -48,7 +48,9 @@ from synapse.handlers.typing import TypingHandler from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.receipts import ReceiptsHandler -from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory +from synapse.http.client import ( + SimpleHttpClient, InsecureInterceptableContextFactory, MatrixProxyClient +) from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.notifier import Notifier from synapse.push.pusherpool import PusherPool @@ -127,6 +129,7 @@ class HomeServer(object): 'filtering', 'http_client_context_factory', 'simple_http_client', + 'matrix_proxy_client', 'media_repository', 'federation_transport_client', 'federation_sender', @@ -188,6 +191,9 @@ class HomeServer(object): def build_simple_http_client(self): return SimpleHttpClient(self) + def build_matrix_proxy_client(self): + return MatrixProxyClient(self) + def build_v1auth(self): orf = Auth(self) # Matrix spec makes no reference to what HTTP status code is returned, -- cgit 1.5.1 From 1a9255c12eb73245bdbb626a1a0cad2fbe967caa Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 25 Apr 2017 19:30:55 +0100 Subject: Use CodeMessageException subclass instead Parse json errors from get_json client methods and throw special errors. --- synapse/api/errors.py | 11 ++++++++ synapse/handlers/identity.py | 29 +++++++++++++------- synapse/http/client.py | 64 ++++++++++++++------------------------------ synapse/server.py | 8 +----- 4 files changed, 51 insertions(+), 61 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 6fbd5d6876..d0dfa959dc 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -66,6 +66,17 @@ class CodeMessageException(RuntimeError): return cs_error(self.msg) +class MatrixCodeMessageException(CodeMessageException): + """An error from a general matrix endpoint, eg. from a proxied Matrix API call. + + Attributes: + errcode (str): Matrix error code e.g 'M_FORBIDDEN' + """ + def __init__(self, code, msg, errcode=Codes.UNKNOWN): + super(MatrixCodeMessageException, self).__init__(code, msg) + self.errcode = errcode + + class SynapseError(CodeMessageException): """A base exception type for matrix errors which have an errcode and error message (as well as an HTTP status code). diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 41f978d990..8b407a307c 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import ( - CodeMessageException + MatrixCodeMessageException, CodeMessageException ) from ._base import BaseHandler from synapse.util.async import run_on_reactor @@ -35,7 +35,7 @@ class IdentityHandler(BaseHandler): def __init__(self, hs): super(IdentityHandler, self).__init__(hs) - self.proxy_client = hs.get_matrix_proxy_client() + self.http_client = hs.get_simple_http_client() self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers) self.trust_any_id_server_just_for_testing_do_not_use = ( @@ -83,13 +83,16 @@ class IdentityHandler(BaseHandler): data = {} try: - data = yield self.proxy_client.get_json( - "https://%s%s" % ( + data = yield self.http_client.get_json( + "http://%s%s" % ( id_server, "/_matrix/identity/api/v1/3pid/getValidated3pid" ), {'sid': creds['sid'], 'client_secret': client_secret} ) + except MatrixCodeMessageException as e: + logger.info("getValidated3pid failed with Matrix error: %r", e) + raise SynapseError(e.code, e.msg, e.errcode) except CodeMessageException as e: data = json.loads(e.msg) @@ -118,8 +121,8 @@ class IdentityHandler(BaseHandler): raise SynapseError(400, "No client_secret in creds") try: - data = yield self.proxy_client.post_urlencoded_get_json( - "https://%s%s" % ( + data = yield self.http_client.post_urlencoded_get_json( + "http://%s%s" % ( id_server, "/_matrix/identity/api/v1/3pid/bind" ), { @@ -151,14 +154,17 @@ class IdentityHandler(BaseHandler): params.update(kwargs) try: - data = yield self.proxy_client.post_json_get_json( - "https://%s%s" % ( + data = yield self.http_client.post_json_get_json( + "http://%s%s" % ( id_server, "/_matrix/identity/api/v1/validate/email/requestToken" ), params ) defer.returnValue(data) + except MatrixCodeMessageException as e: + logger.info("Proxied requestToken failed with Matrix error: %r", e) + raise SynapseError(e.code, e.msg, e.errcode) except CodeMessageException as e: logger.info("Proxied requestToken failed: %r", e) raise e @@ -185,14 +191,17 @@ class IdentityHandler(BaseHandler): params.update(kwargs) try: - data = yield self.proxy_client.post_json_get_json( - "https://%s%s" % ( + data = yield self.http_client.post_json_get_json( + "http://%s%s" % ( id_server, "/_matrix/identity/api/v1/validate/msisdn/requestToken" ), params ) defer.returnValue(data) + except MatrixCodeMessageException as e: + logger.info("Proxied requestToken failed with Matrix error: %r", e) + raise SynapseError(e.code, e.msg, e.errcode) except CodeMessageException as e: logger.info("Proxied requestToken failed: %r", e) raise e diff --git a/synapse/http/client.py b/synapse/http/client.py index df8c3e3c2c..57a49b2827 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -16,7 +16,7 @@ from OpenSSL import SSL from OpenSSL.SSL import VERIFY_NONE from synapse.api.errors import ( - CodeMessageException, SynapseError, Codes, + CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, ) from synapse.util.logcontext import preserve_context_over_fn import synapse.metrics @@ -145,8 +145,10 @@ class SimpleHttpClient(object): body = yield preserve_context_over_fn(readBody, response) - if response.code / 100 >= 4: - raise CodeMessageException(response.code, body) + if 200 <= response.code < 300: + defer.returnValue(json.loads(body)) + else: + raise self._exceptionFromFailedRequest(response, body) defer.returnValue(json.loads(body)) @@ -168,7 +170,11 @@ class SimpleHttpClient(object): error message. """ body = yield self.get_raw(uri, args) - defer.returnValue(json.loads(body)) + + if 200 <= response.code < 300: + defer.returnValue(json.loads(body)) + else: + raise self._exceptionFromFailedRequest(response, body) @defer.inlineCallbacks def put_json(self, uri, json_body, args={}): @@ -249,6 +255,16 @@ class SimpleHttpClient(object): else: raise CodeMessageException(response.code, body) + def _exceptionFromFailedRequest(self, response, body): + try: + jsonBody = json.loads(body) + errcode = jsonBody['errcode'] + error = jsonBody['error'] + return MatrixCodeMessageException(response.code, error, errcode) + except e: + print e + return CodeMessageException(response.code, body) + # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient. # The two should be factored out. @@ -309,46 +325,6 @@ class SimpleHttpClient(object): defer.returnValue((length, headers, response.request.absoluteURI, response.code)) -class MatrixProxyClient(object): - """ - An HTTP client that proxies other Matrix endpoints, ie. if the remote endpoint - returns Matrix-style error response, this will raise the appropriate SynapseError - """ - def __init__(self, hs): - self.simpleHttpClient = SimpleHttpClient(hs) - - @defer.inlineCallbacks - def post_json_get_json(self, uri, post_json): - try: - result = yield self.simpleHttpClient.post_json_get_json(uri, post_json) - defer.returnValue(result) - except CodeMessageException as cme: - ex = self._tryGetMatrixError(cme) - if ex is not None: - raise ex - raise cme - - @defer.inlineCallbacks - def get_json(self, uri, args={}): - try: - result = yield self.simpleHttpClient.get_json(uri, args) - defer.returnValue(result) - except CodeMessageException as cme: - ex = self._tryGetMatrixError(cme) - if ex is not None: - raise ex - raise cme - - def _tryGetMatrixError(self, codeMessageException): - try: - errbody = json.loads(codeMessageException.msg) - errcode = errbody['errcode'] - errtext = errbody['error'] - return SynapseError(codeMessageException.code, errtext, errcode) - except: - return None - - # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient. # The two should be factored out. diff --git a/synapse/server.py b/synapse/server.py index 8325d77a7a..12754c89ae 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -49,9 +49,7 @@ from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.receipts import ReceiptsHandler from synapse.handlers.read_marker import ReadMarkerHandler -from synapse.http.client import ( - SimpleHttpClient, InsecureInterceptableContextFactory, MatrixProxyClient -) +from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.notifier import Notifier from synapse.push.pusherpool import PusherPool @@ -130,7 +128,6 @@ class HomeServer(object): 'filtering', 'http_client_context_factory', 'simple_http_client', - 'matrix_proxy_client', 'media_repository', 'federation_transport_client', 'federation_sender', @@ -193,9 +190,6 @@ class HomeServer(object): def build_simple_http_client(self): return SimpleHttpClient(self) - def build_matrix_proxy_client(self): - return MatrixProxyClient(self) - def build_v1auth(self): orf = Auth(self) # Matrix spec makes no reference to what HTTP status code is returned, -- cgit 1.5.1 From 82ae0238f9ff15a5a5445cb681ab0cb507d865f3 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 26 Apr 2017 11:43:16 +0100 Subject: Revert accidental commit --- synapse/handlers/identity.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 8b407a307c..9efcdff1d6 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -84,7 +84,7 @@ class IdentityHandler(BaseHandler): data = {} try: data = yield self.http_client.get_json( - "http://%s%s" % ( + "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/3pid/getValidated3pid" ), @@ -122,7 +122,7 @@ class IdentityHandler(BaseHandler): try: data = yield self.http_client.post_urlencoded_get_json( - "http://%s%s" % ( + "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/3pid/bind" ), { @@ -155,7 +155,7 @@ class IdentityHandler(BaseHandler): try: data = yield self.http_client.post_json_get_json( - "http://%s%s" % ( + "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/validate/email/requestToken" ), @@ -192,7 +192,7 @@ class IdentityHandler(BaseHandler): try: data = yield self.http_client.post_json_get_json( - "http://%s%s" % ( + "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/validate/msisdn/requestToken" ), -- cgit 1.5.1 From 34e682d3855562a4ca6e32f366a2ecfa1c684a3a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Apr 2017 16:18:08 +0100 Subject: Fix invite state to always include all events --- synapse/events/utils.py | 20 +++++++++++++++++++- synapse/handlers/message.py | 9 +++------ synapse/rest/client/v2_alpha/sync.py | 2 ++ 3 files changed, 24 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 5bbaef8187..7ca7796558 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -225,7 +225,22 @@ def format_event_for_client_v2_without_room_id(d): def serialize_event(e, time_now_ms, as_client_event=True, event_format=format_event_for_client_v1, - token_id=None, only_event_fields=None): + token_id=None, only_event_fields=None, is_invite=False): + """Serialize event for clients + + Args: + e (EventBase) + time_now_ms (int) + as_client_event (bool) + event_format + token_id + only_event_fields + is_invite (bool): Whether this is an invite that is being sent to the + invitee + + Returns: + dict + """ # FIXME(erikj): To handle the case of presence events and the like if not isinstance(e, EventBase): return e @@ -260,4 +275,7 @@ def serialize_event(e, time_now_ms, as_client_event=True, raise TypeError("only_event_fields must be a list of strings") d = only_fields(d, only_event_fields) + if not is_invite: + d["unsigned"].pop("invite_room_state", None) + return d diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 348056add5..82a2ade1f6 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -531,9 +531,9 @@ class MessageHandler(BaseHandler): state_to_include_ids = [ e_id - for k, e_id in context.current_state_ids.items() + for k, e_id in context.current_state_ids.iteritems() if k[0] in self.hs.config.room_invite_state_types - or k[0] == EventTypes.Member and k[1] == event.sender + or k == (EventTypes.Member, event.sender) ] state_to_include = yield self.store.get_events(state_to_include_ids) @@ -545,7 +545,7 @@ class MessageHandler(BaseHandler): "content": e.content, "sender": e.sender, } - for e in state_to_include.values() + for e in state_to_include.itervalues() ] invitee = UserID.from_string(event.state_key) @@ -618,6 +618,3 @@ class MessageHandler(BaseHandler): ) preserve_fn(_notify)() - - # If invite, remove room_state from unsigned before sending. - event.unsigned.pop("invite_room_state", None) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index a7a9e0a794..b5bcfade70 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -250,9 +250,11 @@ class SyncRestServlet(RestServlet): """ invited = {} for room in rooms: + logger.info("invite: %r", room.invite) invite = serialize_event( room.invite, time_now, token_id=token_id, event_format=format_event_for_client_v2_without_room_id, + is_invite=True, ) unsigned = dict(invite.get("unsigned", {})) invite["unsigned"] = unsigned -- cgit 1.5.1 From e42b4ebf0f2e1a1cfcacca0e9939f5791baf13ab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 27 Apr 2017 14:38:21 +0100 Subject: Add some extra logging for edge cases of federation --- synapse/handlers/federation.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 52be5a402d..be3ffd01a7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -171,6 +171,12 @@ class FederationHandler(BaseHandler): yield self._get_missing_events_for_pdu( origin, pdu, prevs, min_depth ) + elif prevs - seen: + logger.info( + "Not fetching %d missing events for room %r,event %s: %r...", + len(prevs - seen), pdu.room_id, pdu.event_id, + list(prevs - seen)[:5], + ) prevs = {e_id for e_id, _ in pdu.prev_events} seen = set(have_seen.keys()) @@ -232,8 +238,8 @@ class FederationHandler(BaseHandler): latest |= seen logger.info( - "Missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + "Missing %d events for room %r pdy %s: %r...", + len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5] ) # XXX: we set timeout to 10s to help workaround @@ -265,11 +271,17 @@ class FederationHandler(BaseHandler): timeout=10000, ) + logger.info( + "Got %d events: %r...", + len(missing_events), [e.event_id for e in missing_events[:5]] + ) + # We want to sort these by depth so we process them and # tell clients about them in order. missing_events.sort(key=lambda x: x.depth) for e in missing_events: + logger.info("Handling found event %s", e.event_id) yield self.on_receive_pdu( origin, e, @@ -279,6 +291,14 @@ class FederationHandler(BaseHandler): have_seen = yield self.store.have_events( [ev for ev, _ in pdu.prev_events] ) + seen = set(have_seen.keys()) + if prevs - seen: + logger.info( + "Still missing %d prev events for %s: %r...", + len(prevs - seen), pdu.event_id, list(prevs - seen)[:5] + ) + else: + logger.info("Found all missing prev events for %s", pdu.event_id) defer.returnValue(have_seen) @log_function -- cgit 1.5.1 From 87ae59f5e9d096f7cb9cbe99e0b9c85bce1c9925 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 27 Apr 2017 15:16:21 +0100 Subject: Typo --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index be3ffd01a7..ebbf844489 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -238,7 +238,7 @@ class FederationHandler(BaseHandler): latest |= seen logger.info( - "Missing %d events for room %r pdy %s: %r...", + "Missing %d events for room %r pdu %s: %r...", len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5] ) -- cgit 1.5.1 From 3a9f5bf6ddb9393b68d8da9f04b2b1162720da7b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Apr 2017 11:26:46 +0100 Subject: Don't fetch state for missing events that we fetched --- synapse/handlers/federation.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ebbf844489..33321699a6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -171,6 +171,12 @@ class FederationHandler(BaseHandler): yield self._get_missing_events_for_pdu( origin, pdu, prevs, min_depth ) + + # Update the set of things we've seen after trying to + # fetch the missing stuff + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) elif prevs - seen: logger.info( "Not fetching %d missing events for room %r,event %s: %r...", -- cgit 1.5.1 From 9b147cd73049c6222fa6c3b45dfb2a360de93cc4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Apr 2017 11:55:25 +0100 Subject: Remove unncessary call in _get_missing_events_for_pdu --- synapse/handlers/federation.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 33321699a6..43d1ec859d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -177,6 +177,17 @@ class FederationHandler(BaseHandler): have_seen = yield self.store.have_events( [ev for ev, _ in pdu.prev_events] ) + + seen = set(have_seen.keys()) + if prevs - seen: + logger.info( + "Still missing %d prev events for %s: %r...", + len(prevs - seen), pdu.event_id, list(prevs - seen)[:5] + ) + else: + logger.info( + "Found all missing prev events for %s", pdu.event_id + ) elif prevs - seen: logger.info( "Not fetching %d missing events for room %r,event %s: %r...", @@ -294,19 +305,6 @@ class FederationHandler(BaseHandler): get_missing=False ) - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - seen = set(have_seen.keys()) - if prevs - seen: - logger.info( - "Still missing %d prev events for %s: %r...", - len(prevs - seen), pdu.event_id, list(prevs - seen)[:5] - ) - else: - logger.info("Found all missing prev events for %s", pdu.event_id) - defer.returnValue(have_seen) - @log_function @defer.inlineCallbacks def _process_received_pdu(self, origin, pdu, state, auth_chain): -- cgit 1.5.1 From 2347efc065912a0dca6fd0611012b853d62e6629 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Apr 2017 12:46:53 +0100 Subject: Fixup --- synapse/handlers/federation.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 43d1ec859d..be2ea0cd56 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -174,11 +174,9 @@ class FederationHandler(BaseHandler): # Update the set of things we've seen after trying to # fetch the missing stuff - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) + have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.keys()) + seen = set(have_seen.iterkeys()) if prevs - seen: logger.info( "Still missing %d prev events for %s: %r...", @@ -231,19 +229,15 @@ class FederationHandler(BaseHandler): Args: origin (str): Origin of the pdu. Will be called to get the missing events pdu: received pdu - prevs (str[]): List of event ids which we are missing + prevs (set(str)): List of event ids which we are missing min_depth (int): Minimum depth of events to return. - - Returns: - Deferred: updated have_seen dictionary """ # We recalculate seen, since it may have changed. have_seen = yield self.store.have_events(prevs) seen = set(have_seen.keys()) if not prevs - seen: - # nothing left to do - defer.returnValue(have_seen) + return latest = yield self.store.get_latest_event_ids_in_room( pdu.room_id -- cgit 1.5.1 From 3e5a62ecd8839fbfb56aa33b92127822a053ef6d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 May 2017 11:36:11 +0100 Subject: Add more granular event send metrics --- synapse/events/snapshot.py | 3 +++ synapse/handlers/message.py | 10 ++++++++-- synapse/handlers/room_member.py | 1 + synapse/rest/client/v1/room.py | 1 + synapse/storage/events.py | 16 ++++++++++++++++ tests/storage/event_injector.py | 4 ++-- tests/storage/test_events.py | 2 +- 7 files changed, 32 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 6be18880b9..e9a732ff03 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -50,6 +50,7 @@ class EventContext(object): "prev_group", "delta_ids", "prev_state_events", + "app_service", ] def __init__(self): @@ -68,3 +69,5 @@ class EventContext(object): self.delta_ids = None self.prev_state_events = None + + self.app_service = None diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 82a2ade1f6..57265c6d7d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -175,7 +175,8 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks - def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None): + def create_event(self, requester, event_dict, token_id=None, txn_id=None, + prev_event_ids=None): """ Given a dict from a client, create a new event. @@ -185,6 +186,7 @@ class MessageHandler(BaseHandler): Adds display names to Join membership events. Args: + requester event_dict (dict): An entire event token_id (str) txn_id (str) @@ -226,6 +228,7 @@ class MessageHandler(BaseHandler): event, context = yield self._create_new_client_event( builder=builder, + requester=requester, prev_event_ids=prev_event_ids, ) @@ -319,6 +322,7 @@ class MessageHandler(BaseHandler): See self.create_event and self.send_nonmember_event. """ event, context = yield self.create_event( + requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id @@ -416,7 +420,7 @@ class MessageHandler(BaseHandler): @measure_func("_create_new_client_event") @defer.inlineCallbacks - def _create_new_client_event(self, builder, prev_event_ids=None): + def _create_new_client_event(self, builder, requester=None, prev_event_ids=None): if prev_event_ids: prev_events = yield self.store.add_event_hashes(prev_event_ids) prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) @@ -456,6 +460,8 @@ class MessageHandler(BaseHandler): state_handler = self.state_handler context = yield state_handler.compute_event_context(builder) + if requester: + context.app_service = requester.app_service if builder.is_state(): builder.prev_state = yield self.store.add_event_hashes( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 28b2c80a93..ab87632d99 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -70,6 +70,7 @@ class RoomMemberHandler(BaseHandler): content["kind"] = "guest" event, context = yield msg_handler.create_event( + requester, { "type": EventTypes.Member, "content": content, diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index c376ab8fd7..cd388770c8 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -164,6 +164,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): else: msg_handler = self.handlers.message_handler event, context = yield msg_handler.create_event( + requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a3790419dd..98707d40ee 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -29,6 +29,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.state import resolve_events from synapse.util.caches.descriptors import cached +from synapse.types import get_domain_from_id from canonicaljson import encode_canonical_json from collections import deque, namedtuple, OrderedDict @@ -49,6 +50,9 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) persist_event_counter = metrics.register_counter("persisted_events") +event_counter = metrics.register_counter( + "persisted_events_sep", labels=["type", "origin_type", "origin_entity"] +) def encode_json(json_object): @@ -370,6 +374,18 @@ class EventsStore(SQLBaseStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + for event, context in chunk: + if context.app_service: + origin_type = "local" + origin_entity = context.app_service.id + elif self.hs.is_mine_id(event.sender): + origin_type = "local" + origin_entity = "*client*" + else: + origin_type = "remote" + origin_entity = get_domain_from_id(event.sender) + + event_counter.inc(event.type, origin_type, origin_entity) @defer.inlineCallbacks def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): diff --git a/tests/storage/event_injector.py b/tests/storage/event_injector.py index 38556da9a7..024ac15069 100644 --- a/tests/storage/event_injector.py +++ b/tests/storage/event_injector.py @@ -27,10 +27,10 @@ class EventInjector: self.event_builder_factory = hs.get_event_builder_factory() @defer.inlineCallbacks - def create_room(self, room): + def create_room(self, room, user): builder = self.event_builder_factory.new({ "type": EventTypes.Create, - "sender": "", + "sender": user.to_string(), "room_id": room.to_string(), "content": {}, }) diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py index 3762b38e37..14443b53bc 100644 --- a/tests/storage/test_events.py +++ b/tests/storage/test_events.py @@ -50,7 +50,7 @@ class EventsStoreTestCase(unittest.TestCase): # Create something to report room = RoomID.from_string("!abc123:test") user = UserID.from_string("@raccoonlover:test") - yield self.event_injector.create_room(room) + yield self.event_injector.create_room(room, user) self.base_event = yield self._get_last_stream_token() -- cgit 1.5.1 From ef862186dd6b04210a536cbe3126e018260f8467 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 May 2017 10:06:43 +0100 Subject: Merge together redundant calculations/logging --- synapse/handlers/federation.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index be2ea0cd56..2af9849ed0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -175,14 +175,9 @@ class FederationHandler(BaseHandler): # Update the set of things we've seen after trying to # fetch the missing stuff have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.iterkeys()) - if prevs - seen: - logger.info( - "Still missing %d prev events for %s: %r...", - len(prevs - seen), pdu.event_id, list(prevs - seen)[:5] - ) - else: + + if not prevs - seen: logger.info( "Found all missing prev events for %s", pdu.event_id ) @@ -193,8 +188,6 @@ class FederationHandler(BaseHandler): list(prevs - seen)[:5], ) - prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) if prevs - seen: logger.info( "Still missing %d events for room %r: %r...", -- cgit 1.5.1 From f346048a6e9ac798b742d939a38e0cfe71475f38 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 May 2017 10:34:10 +0100 Subject: Handle exceptions thrown in handling remote device list updates --- synapse/handlers/device.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c22f65ce5d..72915b85d8 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -17,6 +17,7 @@ from synapse.api.constants import EventTypes from synapse.util import stringutils from synapse.util.async import Linearizer from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.retryutils import NotRetryingDestination from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id, RoomStreamToken from twisted.internet import defer @@ -430,7 +431,21 @@ class DeviceListEduUpdater(object): if resync: # Fetch all devices for the user. origin = get_domain_from_id(user_id) - result = yield self.federation.query_user_devices(origin, user_id) + try: + result = yield self.federation.query_user_devices(origin, user_id) + except NotRetryingDestination: + logger.warn( + "Failed to handle device list update for %s," + " we're not retrying the remote", + user_id, + ) + return + except Exception: + logger.exception( + "Failed to handle device list update for %s", user_id + ) + return + stream_id = result["stream_id"] devices = result["devices"] yield self.store.update_remote_device_list_cache( -- cgit 1.5.1 From b843631d7157f0beb64db62a86c691369aa49b14 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 May 2017 10:59:32 +0100 Subject: Add comment and TODO --- synapse/handlers/device.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 72915b85d8..187af03fb1 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -426,6 +426,8 @@ class DeviceListEduUpdater(object): # This can happen since we batch updates return + # Given a list of updates we check if we need to resync. This + # happens if we've missed updates. resync = yield self._need_to_do_resync(user_id, pending_updates) if resync: @@ -434,6 +436,8 @@ class DeviceListEduUpdater(object): try: result = yield self.federation.query_user_devices(origin, user_id) except NotRetryingDestination: + # TODO: Remember that we are now out of sync and try again + # later logger.warn( "Failed to handle device list update for %s," " we're not retrying the remote", @@ -441,6 +445,8 @@ class DeviceListEduUpdater(object): ) return except Exception: + # TODO: Remember that we are now out of sync and try again + # later logger.exception( "Failed to handle device list update for %s", user_id ) -- cgit 1.5.1 From 7b222fc56e9854d1f8f084d996d9ca694e91dd6c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 May 2017 11:14:09 +0100 Subject: Remove redundant reset of destination timers --- synapse/handlers/federation.py | 7 ------- 1 file changed, 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2af9849ed0..52d97dfbf3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -380,13 +380,6 @@ class FederationHandler(BaseHandler): affected=event.event_id, ) - # if we're receiving valid events from an origin, - # it's probably a good idea to mark it as not in retry-state - # for sending (although this is a bit of a leap) - retry_timings = yield self.store.get_destination_retry_timings(origin) - if retry_timings and retry_timings["retry_last_ts"]: - self.store.set_destination_retry_timings(origin, 0, 0) - room = yield self.store.get_room(event.room_id) if not room: -- cgit 1.5.1 From 653d90c1a529ff553d506ac806ab0403f34955ad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 May 2017 14:01:17 +0100 Subject: Comment --- synapse/handlers/device.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 187af03fb1..982cda3edf 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -443,6 +443,12 @@ class DeviceListEduUpdater(object): " we're not retrying the remote", user_id, ) + # We abort on exceptions rather than accepting the update + # as otherwise synapse will 'forget' that its device list + # is out of date. If we bail then we will retry the resync + # next time we get a device list update for this user_id. + # This makes it more likely that the device lists will + # eventually become consistent. return except Exception: # TODO: Remember that we are now out of sync and try again -- cgit 1.5.1 From a7e9d8762ddbcea0fcb7ab87c2c4f4e0d91e639a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 9 May 2017 18:26:54 +0100 Subject: Allow clients to upload one-time-keys with new sigs When a client retries a key upload, don't give an error if the signature has changed (but the key is the same). Fixes https://github.com/vector-im/riot-android/issues/1208, hopefully. --- synapse/handlers/e2e_keys.py | 70 ++++++++++++++++++++++----- synapse/storage/end_to_end_keys.py | 47 ++++++++++-------- tests/handlers/test_e2e_keys.py | 98 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 182 insertions(+), 33 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index c2b38d72a9..9d994a8f71 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -288,19 +288,8 @@ class E2eKeysHandler(object): one_time_keys = keys.get("one_time_keys", None) if one_time_keys: - logger.info( - "Adding %d one_time_keys for device %r for user %r at %d", - len(one_time_keys), device_id, user_id, time_now - ) - key_list = [] - for key_id, key_json in one_time_keys.items(): - algorithm, key_id = key_id.split(":") - key_list.append(( - algorithm, key_id, encode_canonical_json(key_json) - )) - - yield self.store.add_e2e_one_time_keys( - user_id, device_id, time_now, key_list + yield self._upload_one_time_keys_for_user( + user_id, device_id, time_now, one_time_keys, ) # the device should have been registered already, but it may have been @@ -313,3 +302,58 @@ class E2eKeysHandler(object): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) defer.returnValue({"one_time_key_counts": result}) + + @defer.inlineCallbacks + def _upload_one_time_keys_for_user(self, user_id, device_id, time_now, + one_time_keys): + logger.info( + "Adding one_time_keys %r for device %r for user %r at %d", + one_time_keys.keys(), device_id, user_id, time_now, + ) + + # make a list of (alg, id, key) tuples + key_list = [] + for key_id, key_obj in one_time_keys.items(): + algorithm, key_id = key_id.split(":") + key_list.append(( + algorithm, key_id, key_obj + )) + + # First we check if we have already persisted any of the keys. + existing_key_map = yield self.store.get_e2e_one_time_keys( + user_id, device_id, [k_id for _, k_id, _ in key_list] + ) + + new_keys = [] # Keys that we need to insert. (alg, id, json) tuples. + for algorithm, key_id, key in key_list: + ex_json = existing_key_map.get((algorithm, key_id), None) + if ex_json: + if not _one_time_keys_match(ex_json, key): + raise SynapseError( + 400, + ("One time key %s:%s already exists. " + "Old key: %s; new key: %r") % + (algorithm, key_id, ex_json, key) + ) + else: + new_keys.append((algorithm, key_id, encode_canonical_json(key))) + + yield self.store.add_e2e_one_time_keys( + user_id, device_id, time_now, new_keys + ) + + +def _one_time_keys_match(old_key_json, new_key): + old_key = json.loads(old_key_json) + + # if either is a string rather than an object, they must match exactly + if not isinstance(old_key, dict) or not isinstance(new_key, dict): + return old_key == new_key + + # otherwise, we strip off the 'signatures' if any, because it's legitimate + # for different upload attempts to have different signatures. + old_key.pop("signatures", None) + new_key_copy = dict(new_key) + new_key_copy.pop("signatures", None) + + return old_key == new_key_copy diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index c96dae352d..e00f31da2b 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -14,7 +14,6 @@ # limitations under the License. from twisted.internet import defer -from synapse.api.errors import SynapseError from synapse.util.caches.descriptors import cached from canonicaljson import encode_canonical_json @@ -124,18 +123,24 @@ class EndToEndKeyStore(SQLBaseStore): return result @defer.inlineCallbacks - def add_e2e_one_time_keys(self, user_id, device_id, time_now, key_list): - """Insert some new one time keys for a device. + def get_e2e_one_time_keys(self, user_id, device_id, key_ids): + """Retrieve a number of one-time keys for a user - Checks if any of the keys are already inserted, if they are then check - if they match. If they don't then we raise an error. + Args: + user_id(str): id of user to get keys for + device_id(str): id of device to get keys for + key_ids(list[str]): list of key ids (excluding algorithm) to + retrieve + + Returns: + deferred resolving to Dict[(str, str), str]: map from (algorithm, + key_id) to json string for key """ - # First we check if we have already persisted any of the keys. rows = yield self._simple_select_many_batch( table="e2e_one_time_keys_json", column="key_id", - iterable=[key_id for _, key_id, _ in key_list], + iterable=key_ids, retcols=("algorithm", "key_id", "key_json",), keyvalues={ "user_id": user_id, @@ -144,20 +149,22 @@ class EndToEndKeyStore(SQLBaseStore): desc="add_e2e_one_time_keys_check", ) - existing_key_map = { + defer.returnValue({ (row["algorithm"], row["key_id"]): row["key_json"] for row in rows - } - - new_keys = [] # Keys that we need to insert - for algorithm, key_id, json_bytes in key_list: - ex_bytes = existing_key_map.get((algorithm, key_id), None) - if ex_bytes: - if json_bytes != ex_bytes: - raise SynapseError( - 400, "One time key with key_id %r already exists" % (key_id,) - ) - else: - new_keys.append((algorithm, key_id, json_bytes)) + }) + + @defer.inlineCallbacks + def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): + """Insert some new one time keys for a device. Errors if any of the + keys already exist. + + Args: + user_id(str): id of user to get keys for + device_id(str): id of device to get keys for + time_now(long): insertion time to record (ms since epoch) + new_keys(iterable[(str, str, str)]: keys to add - each a tuple of + (algorithm, key_id, key json) + """ def _add_e2e_one_time_keys(txn): # We are protected from race between lookup and insertion due to diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index 878a54dc34..f10a80a8e1 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -14,6 +14,7 @@ # limitations under the License. import mock +from synapse.api import errors from twisted.internet import defer import synapse.api.errors @@ -44,3 +45,100 @@ class E2eKeysHandlerTestCase(unittest.TestCase): local_user = "@boris:" + self.hs.hostname res = yield self.handler.query_local_devices({local_user: None}) self.assertDictEqual(res, {local_user: {}}) + + @defer.inlineCallbacks + def test_reupload_one_time_keys(self): + """we should be able to re-upload the same keys""" + local_user = "@boris:" + self.hs.hostname + device_id = "xyz" + keys = { + "alg1:k1": "key1", + "alg2:k2": { + "key": "key2", + "signatures": {"k1": "sig1"} + }, + "alg2:k3": { + "key": "key3", + }, + } + + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1, "alg2": 2} + }) + + # we should be able to change the signature without a problem + keys["alg2:k2"]["signatures"]["k1"] = "sig2" + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1, "alg2": 2} + }) + + @defer.inlineCallbacks + def test_change_one_time_keys(self): + """attempts to change one-time-keys should be rejected""" + + local_user = "@boris:" + self.hs.hostname + device_id = "xyz" + keys = { + "alg1:k1": "key1", + "alg2:k2": { + "key": "key2", + "signatures": {"k1": "sig1"} + }, + "alg2:k3": { + "key": "key3", + }, + } + + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1, "alg2": 2} + }) + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": {"alg1:k1": "key2"}}, + ) + self.fail("No error when changing string key") + except errors.SynapseError: + pass + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": {"alg2:k3": "key2"}}, + ) + self.fail("No error when replacing dict key with string") + except errors.SynapseError: + pass + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, { + "one_time_keys": {"alg1:k1": {"key": "key"}} + }, + ) + self.fail("No error when replacing string key with dict") + except errors.SynapseError: + pass + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, { + "one_time_keys": { + "alg2:k2": { + "key": "key3", + "signatures": {"k1": "sig1"}, + } + }, + }, + ) + self.fail("No error when replacing dict key") + except errors.SynapseError: + pass -- cgit 1.5.1 From de042b3b885aba6b1508ca50e033fb7a95893553 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 9 May 2017 19:01:39 +0100 Subject: Do some logging when one-time-keys get claimed might help us figure out if https://github.com/vector-im/riot-web/issues/3868 has happened. --- synapse/federation/federation_server.py | 10 ++++++++++ synapse/handlers/e2e_keys.py | 10 ++++++++++ tests/handlers/test_e2e_keys.py | 34 +++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bc20b9c201..51e3fdea06 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -440,6 +440,16 @@ class FederationServer(FederationBase): key_id: json.loads(json_bytes) } + logger.info( + "Claimed one-time-keys: %s", + ",".join(( + "%s for %s:%s" % (key_id, user_id, device_id) + for user_id, user_keys in json_result.iteritems() + for device_id, device_keys in user_keys.iteritems() + for key_id, _ in device_keys.iteritems() + )), + ) + defer.returnValue({"one_time_keys": json_result}) @defer.inlineCallbacks diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 9d994a8f71..73921a5307 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -262,6 +262,16 @@ class E2eKeysHandler(object): for destination in remote_queries ])) + logger.info( + "Claimed one-time-keys: %s", + ",".join(( + "%s for %s:%s" % (key_id, user_id, device_id) + for user_id, user_keys in json_result.iteritems() + for device_id, device_keys in user_keys.iteritems() + for key_id, _ in device_keys.iteritems() + )), + ) + defer.returnValue({ "one_time_keys": json_result, "failures": failures diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index f10a80a8e1..19f5ed6bce 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -142,3 +142,37 @@ class E2eKeysHandlerTestCase(unittest.TestCase): self.fail("No error when replacing dict key") except errors.SynapseError: pass + + @unittest.DEBUG + @defer.inlineCallbacks + def test_claim_one_time_key(self): + local_user = "@boris:" + self.hs.hostname + device_id = "xyz" + keys = { + "alg1:k1": "key1", + } + + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1} + }) + + res2 = yield self.handler.claim_one_time_keys({ + "one_time_keys": { + local_user: { + device_id: "alg1" + } + } + }, timeout=None) + self.assertEqual(res2, { + "failures": {}, + "one_time_keys": { + local_user: { + device_id: { + "alg1:k1": "key1" + } + } + } + }) -- cgit 1.5.1 From aedaba018f97202ee69b8715b34d6fc1bac39566 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 9 May 2017 19:02:32 +0100 Subject: Replace some instances of preserve_context_over_deferred --- synapse/handlers/e2e_keys.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 73921a5307..668a90e495 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -21,7 +21,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, CodeMessageException from synapse.types import get_domain_from_id -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -145,7 +145,7 @@ class E2eKeysHandler(object): "status": 503, "message": e.message } - yield preserve_context_over_deferred(defer.gatherResults([ + yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(do_remote_query)(destination) for destination in remote_queries_not_in_cache ])) @@ -257,7 +257,7 @@ class E2eKeysHandler(object): "status": 503, "message": e.message } - yield preserve_context_over_deferred(defer.gatherResults([ + yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(claim_client_keys)(destination) for destination in remote_queries ])) -- cgit 1.5.1 From b990b2fce5bb96c1c8eebcb0525ffcd011a22556 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 May 2017 11:05:43 +0100 Subject: Add per user ratelimiting overrides --- synapse/handlers/_base.py | 34 ++++++++++++++++++++++--- synapse/handlers/message.py | 16 +++--------- synapse/handlers/profile.py | 2 +- synapse/handlers/room.py | 2 +- synapse/storage/room.py | 36 ++++++++++++++++++++++++++- synapse/storage/schema/delta/41/ratelimit.sql | 22 ++++++++++++++++ 6 files changed, 93 insertions(+), 19 deletions(-) create mode 100644 synapse/storage/schema/delta/41/ratelimit.sql (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index e83adc8339..faa5609c0c 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -53,7 +53,20 @@ class BaseHandler(object): self.event_builder_factory = hs.get_event_builder_factory() - def ratelimit(self, requester): + @defer.inlineCallbacks + def ratelimit(self, requester, update=True): + """Ratelimits requests. + + Args: + requester (Requester) + update (bool): Whether to record that a request is being processed. + Set to False when doing multiple checks for one request (e.g. + to check up front if we would reject the request), and set to + True for the last call for a given request. + + Raises: + LimitExceededError if the request should be ratelimited + """ time_now = self.clock.time() user_id = requester.user.to_string() @@ -67,10 +80,25 @@ class BaseHandler(object): if requester.app_service and not requester.app_service.is_rate_limited(): return + # Check if there is a per user override in the DB. + override = yield self.store.get_ratelimit_for_user(user_id) + if override: + # If overriden with a null Hz then ratelimiting has been entirely + # disabled for the user + if not override.messages_per_second: + return + + messages_per_second = override.messages_per_second + burst_count = override.burst_count + else: + messages_per_second = self.hs.config.rc_messages_per_second + burst_count = self.hs.config.rc_message_burst_count + allowed, time_allowed = self.ratelimiter.send_message( user_id, time_now, - msg_rate_hz=self.hs.config.rc_messages_per_second, - burst_count=self.hs.config.rc_message_burst_count, + msg_rate_hz=messages_per_second, + burst_count=burst_count, + update=update, ) if not allowed: raise LimitExceededError( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 57265c6d7d..196925edad 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import AuthError, Codes, SynapseError, LimitExceededError +from synapse.api.errors import AuthError, Codes, SynapseError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -254,17 +254,7 @@ class MessageHandler(BaseHandler): # We check here if we are currently being rate limited, so that we # don't do unnecessary work. We check again just before we actually # send the event. - time_now = self.clock.time() - allowed, time_allowed = self.ratelimiter.send_message( - event.sender, time_now, - msg_rate_hz=self.hs.config.rc_messages_per_second, - burst_count=self.hs.config.rc_message_burst_count, - update=False, - ) - if not allowed: - raise LimitExceededError( - retry_after_ms=int(1000 * (time_allowed - time_now)), - ) + yield self.ratelimit(requester, update=False) user = UserID.from_string(event.sender) @@ -499,7 +489,7 @@ class MessageHandler(BaseHandler): # We now need to go and hit out to wherever we need to hit out to. if ratelimit: - self.ratelimit(requester) + yield self.ratelimit(requester) try: yield self.auth.check_from_context(event, context) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 9bf638f818..7abee98dea 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -156,7 +156,7 @@ class ProfileHandler(BaseHandler): if not self.hs.is_mine(user): return - self.ratelimit(requester) + yield self.ratelimit(requester) room_ids = yield self.store.get_rooms_for_user( user.to_string(), diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 99cb7db0db..d2a0d6520a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -75,7 +75,7 @@ class RoomCreationHandler(BaseHandler): """ user_id = requester.user.to_string() - self.ratelimit(requester) + yield self.ratelimit(requester) if "room_alias_name" in config: for wchar in string.whitespace: diff --git a/synapse/storage/room.py b/synapse/storage/room.py index e4c56cc175..5d543652bb 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from ._base import SQLBaseStore from .engines import PostgresEngine, Sqlite3Engine @@ -33,6 +33,11 @@ OpsLevel = collections.namedtuple( ("ban_level", "kick_level", "redact_level",) ) +RatelimitOverride = collections.namedtuple( + "RatelimitOverride", + ("messages_per_second", "burst_count",) +) + class RoomStore(SQLBaseStore): @@ -473,3 +478,32 @@ class RoomStore(SQLBaseStore): return self.runInteraction( "get_all_new_public_rooms", get_all_new_public_rooms ) + + @cachedInlineCallbacks(max_entries=10000) + def get_ratelimit_for_user(self, user_id): + """Check if there are any overrides for ratelimiting for the given + user + + Args: + user_id (str) + + Returns: + RatelimitOverride if there is an override, else None. If the contents + of RatelimitOverride are None or 0 then ratelimitng has been + disabled for that user entirely. + """ + row = yield self._simple_select_one( + table="ratelimit_override", + keyvalues={"user_id": user_id}, + retcols=("messages_per_second", "burst_count"), + allow_none=True, + desc="get_ratelimit_for_user", + ) + + if row: + defer.returnValue(RatelimitOverride( + messages_per_second=row["messages_per_second"], + burst_count=row["burst_count"], + )) + else: + defer.returnValue(None) diff --git a/synapse/storage/schema/delta/41/ratelimit.sql b/synapse/storage/schema/delta/41/ratelimit.sql new file mode 100644 index 0000000000..a194bf0238 --- /dev/null +++ b/synapse/storage/schema/delta/41/ratelimit.sql @@ -0,0 +1,22 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE ratelimit_override ( + user_id TEXT NOT NULL, + messages_per_second BIGINT, + burst_count BIGINT +); + +CREATE UNIQUE INDEX ratelimit_override_idx ON ratelimit_override(user_id); -- cgit 1.5.1 From 369195caa5f52fc67c2496507fc99fccf4b0ede8 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Wed, 10 May 2017 17:17:12 +0100 Subject: Modify register/available to be GET with query param - GET is now the method for register/available - a query parameter "username" is now used Also, empty usernames are now handled with an error message on registration or via register/available: `User ID cannot be empty` --- synapse/handlers/register.py | 7 +++++++ synapse/rest/client/v2_alpha/register.py | 9 ++++----- 2 files changed, 11 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 03c6a85fc6..dd84c5f5e9 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -54,6 +54,13 @@ class RegistrationHandler(BaseHandler): Codes.INVALID_USERNAME ) + if len(localpart) == 0: + raise SynapseError( + 400, + "User ID cannot be empty", + Codes.INVALID_USERNAME + ) + if localpart[0] == '_': raise SynapseError( 400, diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 6a7cd96ea5..1421c18152 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -21,7 +21,7 @@ from synapse.api.auth import get_access_token_from_request, has_access_token from synapse.api.constants import LoginType from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError from synapse.http.servlet import ( - RestServlet, parse_json_object_from_request, assert_params_in_request + RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string ) from synapse.util.msisdn import phone_number_to_msisdn @@ -142,15 +142,14 @@ class UsernameAvailabilityRestServlet(RestServlet): ) @defer.inlineCallbacks - def on_POST(self, request): + def on_GET(self, request): ip = self.hs.get_ip_from_request(request) with self.ratelimiter.ratelimit(ip) as wait_deferred: yield wait_deferred - body = parse_json_object_from_request(request) - assert_params_in_request(body, ['username']) + username = parse_string(request, "username", required=True) - yield self.registration_handler.check_username(body['username']) + yield self.registration_handler.check_username(username) defer.returnValue((200, {"available": True})) -- cgit 1.5.1 From ccad2ed824f1601d2cc29aafe5dff6ef46668f5e Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Wed, 10 May 2017 17:34:30 +0100 Subject: Modify condition on empty localpart --- synapse/handlers/register.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index dd84c5f5e9..ee3a2269a8 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -54,7 +54,7 @@ class RegistrationHandler(BaseHandler): Codes.INVALID_USERNAME ) - if len(localpart) == 0: + if not localpart: raise SynapseError( 400, "User ID cannot be empty", -- cgit 1.5.1 From ec5c4499f4ab24445c6df7310007353b466020ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 May 2017 14:46:16 +0100 Subject: Make presence use cached users/hosts in room --- synapse/federation/transaction_queue.py | 2 +- synapse/handlers/presence.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 695f1a7375..a15198e05d 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -285,7 +285,7 @@ class TransactionQueue(object): Args: states (list(UserPresenceState)) """ - hosts_and_states = yield get_interested_remotes(self.store, states) + hosts_and_states = yield get_interested_remotes(self.store, states, self.state) for destinations, states in hosts_and_states: for destination in destinations: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index f3707afcd0..c7c0b0a1e2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -780,12 +780,12 @@ class PresenceHandler(object): # don't need to send to local clients here, as that is done as part # of the event stream/sync. # TODO: Only send to servers not already in the room. - user_ids = yield self.store.get_users_in_room(room_id) if self.is_mine(user): state = yield self.current_state_for_user(user.to_string()) self._push_to_remotes([state]) else: + user_ids = yield self.store.get_users_in_room(room_id) user_ids = filter(self.is_mine_id, user_ids) states = yield self.current_state_for_users(user_ids) @@ -1322,7 +1322,7 @@ def get_interested_parties(store, states): @defer.inlineCallbacks -def get_interested_remotes(store, states): +def get_interested_remotes(store, states, state_handler): """Given a list of presence states figure out which remote servers should be sent which. @@ -1345,7 +1345,7 @@ def get_interested_remotes(store, states): room_ids_to_states, users_to_states = yield get_interested_parties(store, states) for room_id, states in room_ids_to_states.iteritems(): - hosts = yield store.get_hosts_in_room(room_id) + hosts = yield state_handler.get_current_hosts_in_room(room_id) hosts_and_states.append((hosts, states)) for user_id, states in users_to_states.iteritems(): -- cgit 1.5.1 From 13f540ef1b94e6173bdd4f2d84d90e0948cf5bf2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 May 2017 14:07:24 +0100 Subject: Speed up get_joined_hosts --- synapse/handlers/room_member.py | 3 ++- synapse/storage/roommember.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ab87632d99..1ca88517a2 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -739,10 +739,11 @@ class RoomMemberHandler(BaseHandler): if len(current_state_ids) == 1 and create_event_id: defer.returnValue(self.hs.is_mine_id(create_event_id)) - for (etype, state_key), event_id in current_state_ids.items(): + for etype, state_key in current_state_ids: if etype != EventTypes.Member or not self.hs.is_mine_id(state_key): continue + event_id = current_state_ids[(etype, state_key)] event = yield self.store.get_event(event_id, allow_none=True) if not event: continue diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 2fa20bd87c..404f3583eb 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -534,7 +534,7 @@ class RoomMemberStore(SQLBaseStore): assert state_group is not None joined_hosts = set() - for (etype, state_key), event_id in current_state_ids.items(): + for etype, state_key in current_state_ids: if etype == EventTypes.Member: try: host = get_domain_from_id(state_key) @@ -545,6 +545,7 @@ class RoomMemberStore(SQLBaseStore): if host in joined_hosts: continue + event_id = current_state_ids[(etype, state_key)] event = yield self.get_event(event_id, allow_none=True) if event and event.content["membership"] == Membership.JOIN: joined_hosts.add(intern_string(host)) -- cgit 1.5.1