From 3a75de923b9183c073bbddae1e08fae546a11f7a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 1 Mar 2018 12:19:09 +0000 Subject: Rewrite make_deferred_yieldable avoiding inlineCallbacks ... because (a) it's actually simpler (b) it might be marginally more performant? --- synapse/util/logcontext.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 94fa7cac98..a8dea15c1b 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -299,10 +299,6 @@ def preserve_fn(f): Useful for wrapping functions that return a deferred which you don't yield on. """ - def reset_context(result): - LoggingContext.set_current_context(LoggingContext.sentinel) - return result - def g(*args, **kwargs): current = LoggingContext.current_context() res = f(*args, **kwargs) @@ -323,12 +319,11 @@ def preserve_fn(f): # which is supposed to have a single entry and exit point. But # by spawning off another deferred, we are effectively # adding a new exit point.) - res.addBoth(reset_context) + res.addBoth(_set_context_cb, LoggingContext.sentinel) return res return g -@defer.inlineCallbacks def make_deferred_yieldable(deferred): """Given a deferred, make it follow the Synapse logcontext rules: @@ -342,9 +337,16 @@ def make_deferred_yieldable(deferred): (This is more-or-less the opposite operation to preserve_fn.) """ - with PreserveLoggingContext(): - r = yield deferred - defer.returnValue(r) + if isinstance(deferred, defer.Deferred) and not deferred.called: + prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) + deferred.addBoth(_set_context_cb, prev_context) + return deferred + + +def _set_context_cb(result, context): + """A callback function which just sets the logging context""" + LoggingContext.set_current_context(context) + return result # modules to ignore in `logcontext_tracer` -- cgit 1.5.1 From 0c8ba5dd1ce3e5cec201165c50f69aaa5c68c45d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 11:39:45 +0000 Subject: Split up RoomStore --- synapse/replication/slave/storage/room.py | 21 +-- synapse/storage/room.py | 239 +++++++++++++++--------------- 2 files changed, 125 insertions(+), 135 deletions(-) diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py index f510384033..5ae1670157 100644 --- a/synapse/replication/slave/storage/room.py +++ b/synapse/replication/slave/storage/room.py @@ -14,32 +14,19 @@ # limitations under the License. from ._base import BaseSlavedStore -from synapse.storage import DataStore -from synapse.storage.room import RoomStore +from synapse.storage.room import RoomWorkerStore from ._slaved_id_tracker import SlavedIdTracker -class RoomStore(BaseSlavedStore): +class RoomStore(RoomWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): super(RoomStore, self).__init__(db_conn, hs) self._public_room_id_gen = SlavedIdTracker( db_conn, "public_room_list_stream", "stream_id" ) - get_public_room_ids = DataStore.get_public_room_ids.__func__ - get_current_public_room_stream_id = ( - DataStore.get_current_public_room_stream_id.__func__ - ) - get_public_room_ids_at_stream_id = ( - RoomStore.__dict__["get_public_room_ids_at_stream_id"] - ) - get_public_room_ids_at_stream_id_txn = ( - DataStore.get_public_room_ids_at_stream_id_txn.__func__ - ) - get_published_at_stream_id_txn = ( - DataStore.get_published_at_stream_id_txn.__func__ - ) - get_public_room_changes = DataStore.get_public_room_changes.__func__ + def get_current_public_room_stream_id(self): + return self._public_room_id_gen.get_current_token() def stream_positions(self): result = super(RoomStore, self).stream_positions() diff --git a/synapse/storage/room.py b/synapse/storage/room.py index fff6652e05..7f2c08d7a6 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -16,6 +16,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.storage._base import SQLBaseStore from synapse.storage.search import SearchStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks @@ -38,7 +39,126 @@ RatelimitOverride = collections.namedtuple( ) -class RoomStore(SearchStore): +class RoomWorkerStore(SQLBaseStore): + def get_public_room_ids(self): + return self._simple_select_onecol( + table="rooms", + keyvalues={ + "is_public": True, + }, + retcol="room_id", + desc="get_public_room_ids", + ) + + @cached(num_args=2, max_entries=100) + def get_public_room_ids_at_stream_id(self, stream_id, network_tuple): + """Get pulbic rooms for a particular list, or across all lists. + + Args: + stream_id (int) + network_tuple (ThirdPartyInstanceID): The list to use (None, None) + means the main list, None means all lsits. + """ + return self.runInteraction( + "get_public_room_ids_at_stream_id", + self.get_public_room_ids_at_stream_id_txn, + stream_id, network_tuple=network_tuple + ) + + def get_public_room_ids_at_stream_id_txn(self, txn, stream_id, + network_tuple): + return { + rm + for rm, vis in self.get_published_at_stream_id_txn( + txn, stream_id, network_tuple=network_tuple + ).items() + if vis + } + + def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple): + if network_tuple: + # We want to get from a particular list. No aggregation required. + + sql = (""" + SELECT room_id, visibility FROM public_room_list_stream + INNER JOIN ( + SELECT room_id, max(stream_id) AS stream_id + FROM public_room_list_stream + WHERE stream_id <= ? %s + GROUP BY room_id + ) grouped USING (room_id, stream_id) + """) + + if network_tuple.appservice_id is not None: + txn.execute( + sql % ("AND appservice_id = ? AND network_id = ?",), + (stream_id, network_tuple.appservice_id, network_tuple.network_id,) + ) + else: + txn.execute( + sql % ("AND appservice_id IS NULL",), + (stream_id,) + ) + return dict(txn) + else: + # We want to get from all lists, so we need to aggregate the results + + logger.info("Executing full list") + + sql = (""" + SELECT room_id, visibility + FROM public_room_list_stream + INNER JOIN ( + SELECT + room_id, max(stream_id) AS stream_id, appservice_id, + network_id + FROM public_room_list_stream + WHERE stream_id <= ? + GROUP BY room_id, appservice_id, network_id + ) grouped USING (room_id, stream_id) + """) + + txn.execute( + sql, + (stream_id,) + ) + + results = {} + # A room is visible if its visible on any list. + for room_id, visibility in txn: + results[room_id] = bool(visibility) or results.get(room_id, False) + + return results + + def get_public_room_changes(self, prev_stream_id, new_stream_id, + network_tuple): + def get_public_room_changes_txn(txn): + then_rooms = self.get_public_room_ids_at_stream_id_txn( + txn, prev_stream_id, network_tuple + ) + + now_rooms_dict = self.get_published_at_stream_id_txn( + txn, new_stream_id, network_tuple + ) + + now_rooms_visible = set( + rm for rm, vis in now_rooms_dict.items() if vis + ) + now_rooms_not_visible = set( + rm for rm, vis in now_rooms_dict.items() if not vis + ) + + newly_visible = now_rooms_visible - then_rooms + newly_unpublished = now_rooms_not_visible & then_rooms + + return newly_visible, newly_unpublished + + return self.runInteraction( + "get_public_room_changes", get_public_room_changes_txn + ) + + +class RoomStore(RoomWorkerStore, SearchStore): @defer.inlineCallbacks def store_room(self, room_id, room_creator_user_id, is_public): @@ -225,16 +345,6 @@ class RoomStore(SearchStore): ) self.hs.get_notifier().on_new_replication_data() - def get_public_room_ids(self): - return self._simple_select_onecol( - table="rooms", - keyvalues={ - "is_public": True, - }, - retcol="room_id", - desc="get_public_room_ids", - ) - def get_room_count(self): """Retrieve a list of all rooms """ @@ -326,113 +436,6 @@ class RoomStore(SearchStore): def get_current_public_room_stream_id(self): return self._public_room_id_gen.get_current_token() - @cached(num_args=2, max_entries=100) - def get_public_room_ids_at_stream_id(self, stream_id, network_tuple): - """Get pulbic rooms for a particular list, or across all lists. - - Args: - stream_id (int) - network_tuple (ThirdPartyInstanceID): The list to use (None, None) - means the main list, None means all lsits. - """ - return self.runInteraction( - "get_public_room_ids_at_stream_id", - self.get_public_room_ids_at_stream_id_txn, - stream_id, network_tuple=network_tuple - ) - - def get_public_room_ids_at_stream_id_txn(self, txn, stream_id, - network_tuple): - return { - rm - for rm, vis in self.get_published_at_stream_id_txn( - txn, stream_id, network_tuple=network_tuple - ).items() - if vis - } - - def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple): - if network_tuple: - # We want to get from a particular list. No aggregation required. - - sql = (""" - SELECT room_id, visibility FROM public_room_list_stream - INNER JOIN ( - SELECT room_id, max(stream_id) AS stream_id - FROM public_room_list_stream - WHERE stream_id <= ? %s - GROUP BY room_id - ) grouped USING (room_id, stream_id) - """) - - if network_tuple.appservice_id is not None: - txn.execute( - sql % ("AND appservice_id = ? AND network_id = ?",), - (stream_id, network_tuple.appservice_id, network_tuple.network_id,) - ) - else: - txn.execute( - sql % ("AND appservice_id IS NULL",), - (stream_id,) - ) - return dict(txn) - else: - # We want to get from all lists, so we need to aggregate the results - - logger.info("Executing full list") - - sql = (""" - SELECT room_id, visibility - FROM public_room_list_stream - INNER JOIN ( - SELECT - room_id, max(stream_id) AS stream_id, appservice_id, - network_id - FROM public_room_list_stream - WHERE stream_id <= ? - GROUP BY room_id, appservice_id, network_id - ) grouped USING (room_id, stream_id) - """) - - txn.execute( - sql, - (stream_id,) - ) - - results = {} - # A room is visible if its visible on any list. - for room_id, visibility in txn: - results[room_id] = bool(visibility) or results.get(room_id, False) - - return results - - def get_public_room_changes(self, prev_stream_id, new_stream_id, - network_tuple): - def get_public_room_changes_txn(txn): - then_rooms = self.get_public_room_ids_at_stream_id_txn( - txn, prev_stream_id, network_tuple - ) - - now_rooms_dict = self.get_published_at_stream_id_txn( - txn, new_stream_id, network_tuple - ) - - now_rooms_visible = set( - rm for rm, vis in now_rooms_dict.items() if vis - ) - now_rooms_not_visible = set( - rm for rm, vis in now_rooms_dict.items() if not vis - ) - - newly_visible = now_rooms_visible - then_rooms - newly_unpublished = now_rooms_not_visible & then_rooms - - return newly_visible, newly_unpublished - - return self.runInteraction( - "get_public_room_changes", get_public_room_changes_txn - ) - def get_all_new_public_rooms(self, prev_id, current_id, limit): def get_all_new_public_rooms(txn): sql = (""" -- cgit 1.5.1 From a9a2d66cdd0abc2339641808698e63cb06c4a038 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 14:16:02 +0000 Subject: Split out SignatureStore and EventFederationStore --- synapse/replication/slave/storage/events.py | 50 +----- synapse/storage/event_federation.py | 264 ++++++++++++++-------------- synapse/storage/signatures.py | 8 +- 3 files changed, 148 insertions(+), 174 deletions(-) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index de0b26f437..d2495ff994 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -17,13 +17,13 @@ import logging from synapse.api.constants import EventTypes from synapse.storage import DataStore -from synapse.storage.event_federation import EventFederationStore +from synapse.storage.event_federation import EventFederationWorkerStore from synapse.storage.event_push_actions import EventPushActionsWorkerStore from synapse.storage.events_worker import EventsWorkerStore from synapse.storage.roommember import RoomMemberWorkerStore from synapse.storage.state import StateGroupWorkerStore from synapse.storage.stream import StreamStore -from synapse.storage.signatures import SignatureStore +from synapse.storage.signatures import SignatureWorkerStore from synapse.util.caches.stream_change_cache import StreamChangeCache from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker @@ -40,8 +40,12 @@ logger = logging.getLogger(__name__) # the method descriptor on the DataStore and chuck them into our class. -class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore, - EventsWorkerStore, StateGroupWorkerStore, +class SlavedEventStore(EventFederationWorkerStore, + RoomMemberWorkerStore, + EventPushActionsWorkerStore, + EventsWorkerStore, + StateGroupWorkerStore, + SignatureWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): @@ -72,9 +76,6 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore, # Cached functions can't be accessed through a class instance so we need # to reach inside the __dict__ to extract them. - get_latest_event_ids_in_room = EventFederationStore.__dict__[ - "get_latest_event_ids_in_room" - ] get_recent_event_ids_for_room = ( StreamStore.__dict__["get_recent_event_ids_for_room"] @@ -100,48 +101,13 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore, _get_events_around_txn = DataStore._get_events_around_txn.__func__ - get_backfill_events = DataStore.get_backfill_events.__func__ - _get_backfill_events = DataStore._get_backfill_events.__func__ - get_missing_events = DataStore.get_missing_events.__func__ - _get_missing_events = DataStore._get_missing_events.__func__ - - get_auth_chain = DataStore.get_auth_chain.__func__ - get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__ - _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__ - get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__ - get_forward_extremeties_for_room = ( - DataStore.get_forward_extremeties_for_room.__func__ - ) - _get_forward_extremeties_for_room = ( - EventFederationStore.__dict__["_get_forward_extremeties_for_room"] - ) - get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__ get_federation_out_pos = DataStore.get_federation_out_pos.__func__ update_federation_out_pos = DataStore.update_federation_out_pos.__func__ - get_latest_event_ids_and_hashes_in_room = ( - DataStore.get_latest_event_ids_and_hashes_in_room.__func__ - ) - _get_latest_event_ids_and_hashes_in_room = ( - DataStore._get_latest_event_ids_and_hashes_in_room.__func__ - ) - _get_event_reference_hashes_txn = ( - DataStore._get_event_reference_hashes_txn.__func__ - ) - add_event_hashes = ( - DataStore.add_event_hashes.__func__ - ) - get_event_reference_hashes = ( - SignatureStore.__dict__["get_event_reference_hashes"] - ) - get_event_reference_hash = ( - SignatureStore.__dict__["get_event_reference_hash"] - ) - def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 55a05c59d5..00ee82d300 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -15,7 +15,10 @@ from twisted.internet import defer -from ._base import SQLBaseStore +from synapse.storage._base import SQLBaseStore +from synapse.storage.events import EventsWorkerStore +from synapse.storage.signatures import SignatureWorkerStore + from synapse.api.errors import StoreError from synapse.util.caches.descriptors import cached from unpaddedbase64 import encode_base64 @@ -27,30 +30,8 @@ from Queue import PriorityQueue, Empty logger = logging.getLogger(__name__) -class EventFederationStore(SQLBaseStore): - """ Responsible for storing and serving up the various graphs associated - with an event. Including the main event graph and the auth chains for an - event. - - Also has methods for getting the front (latest) and back (oldest) edges - of the event graphs. These are used to generate the parents for new events - and backfilling from another server respectively. - """ - - EVENT_AUTH_STATE_ONLY = "event_auth_state_only" - - def __init__(self, db_conn, hs): - super(EventFederationStore, self).__init__(db_conn, hs) - - self.register_background_update_handler( - self.EVENT_AUTH_STATE_ONLY, - self._background_delete_non_state_event_auth, - ) - - hs.get_clock().looping_call( - self._delete_old_forward_extrem_cache, 60 * 60 * 1000 - ) - +class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, + SQLBaseStore): def get_auth_chain(self, event_ids, include_given=False): """Get auth events for given event_ids. The events *must* be state events. @@ -228,88 +209,6 @@ class EventFederationStore(SQLBaseStore): return int(min_depth) if min_depth is not None else None - def _update_min_depth_for_room_txn(self, txn, room_id, depth): - min_depth = self._get_min_depth_interaction(txn, room_id) - - if min_depth and depth >= min_depth: - return - - self._simple_upsert_txn( - txn, - table="room_depth", - keyvalues={ - "room_id": room_id, - }, - values={ - "min_depth": depth, - }, - ) - - def _handle_mult_prev_events(self, txn, events): - """ - For the given event, update the event edges table and forward and - backward extremities tables. - """ - self._simple_insert_many_txn( - txn, - table="event_edges", - values=[ - { - "event_id": ev.event_id, - "prev_event_id": e_id, - "room_id": ev.room_id, - "is_state": False, - } - for ev in events - for e_id, _ in ev.prev_events - ], - ) - - self._update_backward_extremeties(txn, events) - - def _update_backward_extremeties(self, txn, events): - """Updates the event_backward_extremities tables based on the new/updated - events being persisted. - - This is called for new events *and* for events that were outliers, but - are now being persisted as non-outliers. - - Forward extremities are handled when we first start persisting the events. - """ - events_by_room = {} - for ev in events: - events_by_room.setdefault(ev.room_id, []).append(ev) - - query = ( - "INSERT INTO event_backward_extremities (event_id, room_id)" - " SELECT ?, ? WHERE NOT EXISTS (" - " SELECT 1 FROM event_backward_extremities" - " WHERE event_id = ? AND room_id = ?" - " )" - " AND NOT EXISTS (" - " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? " - " AND outlier = ?" - " )" - ) - - txn.executemany(query, [ - (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False) - for ev in events for e_id, _ in ev.prev_events - if not ev.internal_metadata.is_outlier() - ]) - - query = ( - "DELETE FROM event_backward_extremities" - " WHERE event_id = ? AND room_id = ?" - ) - txn.executemany( - query, - [ - (ev.event_id, ev.room_id) for ev in events - if not ev.internal_metadata.is_outlier() - ] - ) - def get_forward_extremeties_for_room(self, room_id, stream_ordering): """For a given room_id and stream_ordering, return the forward extremeties of the room at that point in "time". @@ -371,28 +270,6 @@ class EventFederationStore(SQLBaseStore): get_forward_extremeties_for_room_txn ) - def _delete_old_forward_extrem_cache(self): - def _delete_old_forward_extrem_cache_txn(txn): - # Delete entries older than a month, while making sure we don't delete - # the only entries for a room. - sql = (""" - DELETE FROM stream_ordering_to_exterm - WHERE - room_id IN ( - SELECT room_id - FROM stream_ordering_to_exterm - WHERE stream_ordering > ? - ) AND stream_ordering < ? - """) - txn.execute( - sql, - (self.stream_ordering_month_ago, self.stream_ordering_month_ago,) - ) - return self.runInteraction( - "_delete_old_forward_extrem_cache", - _delete_old_forward_extrem_cache_txn - ) - def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and including) the events in event_list. Return a list of max size `limit` @@ -522,6 +399,135 @@ class EventFederationStore(SQLBaseStore): return event_results + +class EventFederationStore(EventFederationWorkerStore): + """ Responsible for storing and serving up the various graphs associated + with an event. Including the main event graph and the auth chains for an + event. + + Also has methods for getting the front (latest) and back (oldest) edges + of the event graphs. These are used to generate the parents for new events + and backfilling from another server respectively. + """ + + EVENT_AUTH_STATE_ONLY = "event_auth_state_only" + + def __init__(self, db_conn, hs): + super(EventFederationStore, self).__init__(db_conn, hs) + + self.register_background_update_handler( + self.EVENT_AUTH_STATE_ONLY, + self._background_delete_non_state_event_auth, + ) + + hs.get_clock().looping_call( + self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + ) + + def _update_min_depth_for_room_txn(self, txn, room_id, depth): + min_depth = self._get_min_depth_interaction(txn, room_id) + + if min_depth and depth >= min_depth: + return + + self._simple_upsert_txn( + txn, + table="room_depth", + keyvalues={ + "room_id": room_id, + }, + values={ + "min_depth": depth, + }, + ) + + def _handle_mult_prev_events(self, txn, events): + """ + For the given event, update the event edges table and forward and + backward extremities tables. + """ + self._simple_insert_many_txn( + txn, + table="event_edges", + values=[ + { + "event_id": ev.event_id, + "prev_event_id": e_id, + "room_id": ev.room_id, + "is_state": False, + } + for ev in events + for e_id, _ in ev.prev_events + ], + ) + + self._update_backward_extremeties(txn, events) + + def _update_backward_extremeties(self, txn, events): + """Updates the event_backward_extremities tables based on the new/updated + events being persisted. + + This is called for new events *and* for events that were outliers, but + are now being persisted as non-outliers. + + Forward extremities are handled when we first start persisting the events. + """ + events_by_room = {} + for ev in events: + events_by_room.setdefault(ev.room_id, []).append(ev) + + query = ( + "INSERT INTO event_backward_extremities (event_id, room_id)" + " SELECT ?, ? WHERE NOT EXISTS (" + " SELECT 1 FROM event_backward_extremities" + " WHERE event_id = ? AND room_id = ?" + " )" + " AND NOT EXISTS (" + " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? " + " AND outlier = ?" + " )" + ) + + txn.executemany(query, [ + (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False) + for ev in events for e_id, _ in ev.prev_events + if not ev.internal_metadata.is_outlier() + ]) + + query = ( + "DELETE FROM event_backward_extremities" + " WHERE event_id = ? AND room_id = ?" + ) + txn.executemany( + query, + [ + (ev.event_id, ev.room_id) for ev in events + if not ev.internal_metadata.is_outlier() + ] + ) + + def _delete_old_forward_extrem_cache(self): + def _delete_old_forward_extrem_cache_txn(txn): + # Delete entries older than a month, while making sure we don't delete + # the only entries for a room. + sql = (""" + DELETE FROM stream_ordering_to_exterm + WHERE + room_id IN ( + SELECT room_id + FROM stream_ordering_to_exterm + WHERE stream_ordering > ? + ) AND stream_ordering < ? + """) + txn.execute( + sql, + (self.stream_ordering_month_ago, self.stream_ordering_month_ago,) + ) + return self.runInteraction( + "_delete_old_forward_extrem_cache", + _delete_old_forward_extrem_cache_txn + ) + def clean_room_for_join(self, room_id): return self.runInteraction( "clean_room_for_join", diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 67d5d9969a..e6eeb1b641 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -22,9 +22,7 @@ from synapse.crypto.event_signing import compute_event_reference_hash from synapse.util.caches.descriptors import cached, cachedList -class SignatureStore(SQLBaseStore): - """Persistence for event signatures and hashes""" - +class SignatureWorkerStore(SQLBaseStore): @cached() def get_event_reference_hash(self, event_id): return self._get_event_reference_hashes_txn(event_id) @@ -74,6 +72,10 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return {k: v for k, v in txn} + +class SignatureStore(SignatureWorkerStore): + """Persistence for event signatures and hashes""" + def _store_event_reference_hashes_txn(self, txn, events): """Store a hash for a PDU Args: -- cgit 1.5.1 From 6411f725bedbc4701e9c624ae23f47d52ff0bd7c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 14:05:41 +0000 Subject: Calculate stream_ordering_month_ago correctly on workers --- synapse/replication/slave/storage/events.py | 1 - synapse/storage/__init__.py | 15 --- synapse/storage/event_push_actions.py | 149 ++++++++++++++++------------ 3 files changed, 85 insertions(+), 80 deletions(-) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index de0b26f437..a4d7430f9b 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -67,7 +67,6 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore, "MembershipStreamChangeCache", events_max, ) - self.stream_ordering_month_ago = 0 self._stream_order_on_start = self.get_room_max_stream_ordering() # Cached functions can't be accessed through a class instance so we need diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 0f136f8a06..b3cdcfdc21 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -20,7 +20,6 @@ from synapse.storage.devices import DeviceStore from .appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore ) -from ._base import LoggingTransaction from .directory import DirectoryStore from .events import EventsStore from .presence import PresenceStore, UserPresenceState @@ -228,20 +227,6 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=_group_updates_prefill, ) - cur = LoggingTransaction( - db_conn.cursor(), - name="_find_stream_orderings_for_times_txn", - database_engine=self.database_engine, - after_callbacks=[], - final_callbacks=[], - ) - self._find_stream_orderings_for_times_txn(cur) - cur.close() - - self.find_stream_orderings_looping_call = self._clock.looping_call( - self._find_stream_orderings_for_times, 10 * 60 * 1000 - ) - self._stream_order_on_start = self.get_room_max_stream_ordering() self._min_stream_order_on_start = self.get_room_min_stream_ordering() diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 6454045c2d..c08bebe112 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore +from synapse.storage._base import SQLBaseStore, LoggingTransaction from twisted.internet import defer from synapse.util.async import sleep from synapse.util.caches.descriptors import cachedInlineCallbacks @@ -64,6 +64,27 @@ def _deserialize_action(actions, is_highlight): class EventPushActionsWorkerStore(SQLBaseStore): + def __init__(self, db_conn, hs): + super(EventPushActionsWorkerStore, self).__init__(db_conn, hs) + + # These get correctly ste by _find_stream_orderings_for_times_txn + self.stream_ordering_month_ago = 0 + self.stream_ordering_day_ago = 0 + + cur = LoggingTransaction( + db_conn.cursor(), + name="_find_stream_orderings_for_times_txn", + database_engine=self.database_engine, + after_callbacks=[], + final_callbacks=[], + ) + self._find_stream_orderings_for_times_txn(cur) + cur.close() + + self.find_stream_orderings_looping_call = self._clock.looping_call( + self._find_stream_orderings_for_times, 10 * 60 * 1000 + ) + @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000) def get_unread_event_push_actions_by_room_for_user( self, room_id, user_id, last_read_event_id @@ -443,6 +464,69 @@ class EventPushActionsWorkerStore(SQLBaseStore): desc="remove_push_actions_from_staging", ) + @defer.inlineCallbacks + def _find_stream_orderings_for_times(self): + yield self.runInteraction( + "_find_stream_orderings_for_times", + self._find_stream_orderings_for_times_txn + ) + + def _find_stream_orderings_for_times_txn(self, txn): + logger.info("Searching for stream ordering 1 month ago") + self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn( + txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000 + ) + logger.info( + "Found stream ordering 1 month ago: it's %d", + self.stream_ordering_month_ago + ) + logger.info("Searching for stream ordering 1 day ago") + self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn( + txn, self._clock.time_msec() - 24 * 60 * 60 * 1000 + ) + logger.info( + "Found stream ordering 1 day ago: it's %d", + self.stream_ordering_day_ago + ) + + def _find_first_stream_ordering_after_ts_txn(self, txn, ts): + """ + Find the stream_ordering of the first event that was received after + a given timestamp. This is relatively slow as there is no index on + received_ts but we can then use this to delete push actions before + this. + + received_ts must necessarily be in the same order as stream_ordering + and stream_ordering is indexed, so we manually binary search using + stream_ordering + """ + txn.execute("SELECT MAX(stream_ordering) FROM events") + max_stream_ordering = txn.fetchone()[0] + + if max_stream_ordering is None: + return 0 + + range_start = 0 + range_end = max_stream_ordering + + sql = ( + "SELECT received_ts FROM events" + " WHERE stream_ordering > ?" + " ORDER BY stream_ordering" + " LIMIT 1" + ) + + while range_end - range_start > 1: + middle = int((range_end + range_start) / 2) + txn.execute(sql, (middle,)) + middle_ts = txn.fetchone()[0] + if ts > middle_ts: + range_start = middle + else: + range_end = middle + + return range_end + class EventPushActionsStore(EventPushActionsWorkerStore): EPA_HIGHLIGHT_INDEX = "epa_highlight_index" @@ -650,69 +734,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore): WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? """, (room_id, user_id, stream_ordering)) - @defer.inlineCallbacks - def _find_stream_orderings_for_times(self): - yield self.runInteraction( - "_find_stream_orderings_for_times", - self._find_stream_orderings_for_times_txn - ) - - def _find_stream_orderings_for_times_txn(self, txn): - logger.info("Searching for stream ordering 1 month ago") - self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn( - txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000 - ) - logger.info( - "Found stream ordering 1 month ago: it's %d", - self.stream_ordering_month_ago - ) - logger.info("Searching for stream ordering 1 day ago") - self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn( - txn, self._clock.time_msec() - 24 * 60 * 60 * 1000 - ) - logger.info( - "Found stream ordering 1 day ago: it's %d", - self.stream_ordering_day_ago - ) - - def _find_first_stream_ordering_after_ts_txn(self, txn, ts): - """ - Find the stream_ordering of the first event that was received after - a given timestamp. This is relatively slow as there is no index on - received_ts but we can then use this to delete push actions before - this. - - received_ts must necessarily be in the same order as stream_ordering - and stream_ordering is indexed, so we manually binary search using - stream_ordering - """ - txn.execute("SELECT MAX(stream_ordering) FROM events") - max_stream_ordering = txn.fetchone()[0] - - if max_stream_ordering is None: - return 0 - - range_start = 0 - range_end = max_stream_ordering - - sql = ( - "SELECT received_ts FROM events" - " WHERE stream_ordering > ?" - " ORDER BY stream_ordering" - " LIMIT 1" - ) - - while range_end - range_start > 1: - middle = int((range_end + range_start) / 2) - txn.execute(sql, (middle,)) - middle_ts = txn.fetchone()[0] - if ts > middle_ts: - range_start = middle - else: - range_end = middle - - return range_end - @defer.inlineCallbacks def _rotate_notifs(self): if self._doing_notif_rotation or self.stream_ordering_day_ago is None: -- cgit 1.5.1 From 784f036306a020fcde495887c2881209b913b9b8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 10:54:37 +0000 Subject: Move RoomMemberHandler out of Handlers --- synapse/handlers/__init__.py | 2 -- synapse/handlers/_base.py | 2 +- synapse/handlers/federation.py | 4 +-- synapse/handlers/profile.py | 2 +- synapse/handlers/room.py | 4 +-- synapse/handlers/room_member.py | 54 ++++++++++++++++++-------------- synapse/rest/client/v1/admin.py | 7 +++-- synapse/rest/client/v1/room.py | 19 +++++------ synapse/rest/client/v2_alpha/register.py | 2 +- synapse/server.py | 5 +++ tests/rest/client/v1/test_typing.py | 2 +- 11 files changed, 58 insertions(+), 45 deletions(-) diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 53213cdccf..8f8fd82eb0 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -17,7 +17,6 @@ from .register import RegistrationHandler from .room import ( RoomCreationHandler, RoomContextHandler, ) -from .room_member import RoomMemberHandler from .message import MessageHandler from .federation import FederationHandler from .directory import DirectoryHandler @@ -49,7 +48,6 @@ class Handlers(object): self.registration_handler = RegistrationHandler(hs) self.message_handler = MessageHandler(hs) self.room_creation_handler = RoomCreationHandler(hs) - self.room_member_handler = RoomMemberHandler(hs) self.federation_handler = FederationHandler(hs) self.directory_handler = DirectoryHandler(hs) self.admin_handler = AdminHandler(hs) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index faa5609c0c..e089e66fde 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -158,7 +158,7 @@ class BaseHandler(object): # homeserver. requester = synapse.types.create_requester( target_user, is_guest=True) - handler = self.hs.get_handlers().room_member_handler + handler = self.hs.get_room_member_handler() yield handler.update_membership( requester, target_user, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8832ba58bc..520612683e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2153,7 +2153,7 @@ class FederationHandler(BaseHandler): raise e yield self._check_signature(event, context) - member_handler = self.hs.get_handlers().room_member_handler + member_handler = self.hs.get_room_member_handler() yield member_handler.send_membership_event(None, event, context) else: destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id)) @@ -2197,7 +2197,7 @@ class FederationHandler(BaseHandler): # TODO: Make sure the signatures actually are correct. event.signatures.update(returned_invite.signatures) - member_handler = self.hs.get_handlers().room_member_handler + member_handler = self.hs.get_room_member_handler() yield member_handler.send_membership_event(None, event, context) @defer.inlineCallbacks diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 9800e24453..c9c2879038 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -233,7 +233,7 @@ class ProfileHandler(BaseHandler): ) for room_id in room_ids: - handler = self.hs.get_handlers().room_member_handler + handler = self.hs.get_room_member_handler() try: # Assume the target_user isn't a guest, # because we don't let guests set profile or avatar data. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6ab020bf41..6c425828c1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -165,7 +165,7 @@ class RoomCreationHandler(BaseHandler): creation_content = config.get("creation_content", {}) - room_member_handler = self.hs.get_handlers().room_member_handler + room_member_handler = self.hs.get_room_member_handler() yield self._send_events_for_new_room( requester, @@ -224,7 +224,7 @@ class RoomCreationHandler(BaseHandler): id_server = invite_3pid["id_server"] address = invite_3pid["address"] medium = invite_3pid["medium"] - yield self.hs.get_handlers().room_member_handler.do_3pid_invite( + yield self.hs.get_room_member_handler().do_3pid_invite( room_id, requester.user, medium, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 37dc5e99ab..0329432f5c 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -30,24 +30,33 @@ from synapse.api.errors import AuthError, SynapseError, Codes from synapse.types import UserID, RoomID from synapse.util.async import Linearizer from synapse.util.distributor import user_left_room, user_joined_room -from ._base import BaseHandler logger = logging.getLogger(__name__) id_server_scheme = "https://" -class RoomMemberHandler(BaseHandler): +class RoomMemberHandler(object): # TODO(paul): This handler currently contains a messy conflation of # low-level API that works on UserID objects and so on, and REST-level # API that takes ID strings and returns pagination chunks. These concerns # ought to be separated out a lot better. def __init__(self, hs): - super(RoomMemberHandler, self).__init__(hs) - + self.store = hs.get_datastore() + self.auth = hs.get_auth() + self.state_handler = hs.get_state_handler() + self.config = hs.config + self.is_mine = hs.is_mine + self.is_mine_id = hs.is_mine_id + self.simple_http_client = hs.get_simple_http_client() + + self.federation_handler = hs.get_handlers().federation_handler + self.directory_handler = hs.get_handlers().directory_handler + self.registration_handler = hs.get_handlers().registration_handler self.profile_handler = hs.get_profile_handler() self.event_creation_hander = hs.get_event_creation_handler() + self.replication_layer = hs.get_replication_layer() self.member_linearizer = Linearizer(name="member") @@ -138,7 +147,7 @@ class RoomMemberHandler(BaseHandler): # join dance for now, since we're kinda implicitly checking # that we are allowed to join when we decide whether or not we # need to do the invite/join dance. - yield self.hs.get_handlers().federation_handler.do_invite_join( + yield self.federation_handler.do_invite_join( remote_room_hosts, room_id, user.to_string(), @@ -204,8 +213,7 @@ class RoomMemberHandler(BaseHandler): # if this is a join with a 3pid signature, we may need to turn a 3pid # invite into a normal invite before we can handle the join. if third_party_signed is not None: - replication = self.hs.get_replication_layer() - yield replication.exchange_third_party_invite( + yield self.replication_layer.exchange_third_party_invite( third_party_signed["sender"], target.to_string(), room_id, @@ -226,7 +234,7 @@ class RoomMemberHandler(BaseHandler): requester.user, ) if not is_requester_admin: - if self.hs.config.block_non_admin_invites: + if self.config.block_non_admin_invites: logger.info( "Blocking invite: user is not admin and non-admin " "invites disabled" @@ -286,7 +294,7 @@ class RoomMemberHandler(BaseHandler): if not is_host_in_room: inviter = yield self.get_inviter(target.to_string(), room_id) - if inviter and not self.hs.is_mine(inviter): + if inviter and not self.is_mine(inviter): remote_room_hosts.append(inviter.domain) content["membership"] = Membership.JOIN @@ -311,7 +319,7 @@ class RoomMemberHandler(BaseHandler): if not inviter: raise SynapseError(404, "Not a known room") - if self.hs.is_mine(inviter): + if self.is_mine(inviter): # the inviter was on our server, but has now left. Carry on # with the normal rejection codepath. # @@ -321,7 +329,7 @@ class RoomMemberHandler(BaseHandler): else: # send the rejection to the inviter's HS. remote_room_hosts = remote_room_hosts + [inviter.domain] - fed_handler = self.hs.get_handlers().federation_handler + fed_handler = self.federation_handler try: ret = yield fed_handler.do_remotely_reject_invite( remote_room_hosts, @@ -393,7 +401,7 @@ class RoomMemberHandler(BaseHandler): "Sender (%s) must be same as requester (%s)" % (sender, requester.user) ) - assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,) + assert self.is_mine(sender), "Sender must be our own: %s" % (sender,) else: requester = synapse.types.create_requester(target_user) @@ -477,7 +485,7 @@ class RoomMemberHandler(BaseHandler): Raises: SynapseError if room alias could not be found. """ - directory_handler = self.hs.get_handlers().directory_handler + directory_handler = self.directory_handler mapping = yield directory_handler.get_association(room_alias) if not mapping: @@ -508,7 +516,7 @@ class RoomMemberHandler(BaseHandler): requester, txn_id ): - if self.hs.config.block_non_admin_invites: + if self.config.block_non_admin_invites: is_requester_admin = yield self.auth.is_server_admin( requester.user, ) @@ -555,7 +563,7 @@ class RoomMemberHandler(BaseHandler): str: the matrix ID of the 3pid, or None if it is not recognized. """ try: - data = yield self.hs.get_simple_http_client().get_json( + data = yield self.simple_http_client.get_json( "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,), { "medium": medium, @@ -578,7 +586,7 @@ class RoomMemberHandler(BaseHandler): if server_hostname not in data["signatures"]: raise AuthError(401, "No signature from server %s" % (server_hostname,)) for key_name, signature in data["signatures"][server_hostname].items(): - key_data = yield self.hs.get_simple_http_client().get_json( + key_data = yield self.simple_http_client.get_json( "%s%s/_matrix/identity/api/v1/pubkey/%s" % (id_server_scheme, server_hostname, key_name,), ) @@ -603,7 +611,7 @@ class RoomMemberHandler(BaseHandler): user, txn_id ): - room_state = yield self.hs.get_state_handler().get_current_state(room_id) + room_state = yield self.state_handler.get_current_state(room_id) inviter_display_name = "" inviter_avatar_url = "" @@ -727,15 +735,15 @@ class RoomMemberHandler(BaseHandler): "sender_avatar_url": inviter_avatar_url, } - if self.hs.config.invite_3pid_guest: - registration_handler = self.hs.get_handlers().registration_handler + if self.config.invite_3pid_guest: + registration_handler = self.registration_handler guest_access_token = yield registration_handler.guest_access_token_for( medium=medium, address=address, inviter_user_id=inviter_user_id, ) - guest_user_info = yield self.hs.get_auth().get_user_by_access_token( + guest_user_info = yield self.auth.get_user_by_access_token( guest_access_token ) @@ -744,7 +752,7 @@ class RoomMemberHandler(BaseHandler): "guest_user_id": guest_user_info["user"].to_string(), }) - data = yield self.hs.get_simple_http_client().post_urlencoded_get_json( + data = yield self.simple_http_client.post_urlencoded_get_json( is_url, invite_config ) @@ -793,10 +801,10 @@ class RoomMemberHandler(BaseHandler): # first member event? create_event_id = current_state_ids.get(("m.room.create", "")) if len(current_state_ids) == 1 and create_event_id: - defer.returnValue(self.hs.is_mine_id(create_event_id)) + defer.returnValue(self.is_mine_id(create_event_id)) for etype, state_key in current_state_ids: - if etype != EventTypes.Member or not self.hs.is_mine_id(state_key): + if etype != EventTypes.Member or not self.is_mine_id(state_key): continue event_id = current_state_ids[(etype, state_key)] diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 6073cc6fa2..3917eee42d 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -180,6 +180,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): self.handlers = hs.get_handlers() self.state = hs.get_state_handler() self.event_creation_handler = hs.get_event_creation_handler() + self.room_member_handler = hs.get_room_member_handler() @defer.inlineCallbacks def on_POST(self, request, room_id): @@ -238,7 +239,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): logger.info("Kicking %r from %r...", user_id, room_id) target_requester = create_requester(user_id) - yield self.handlers.room_member_handler.update_membership( + yield self.room_member_handler.update_membership( requester=target_requester, target=target_requester.user, room_id=room_id, @@ -247,9 +248,9 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): ratelimit=False ) - yield self.handlers.room_member_handler.forget(target_requester.user, room_id) + yield self.room_member_handler.forget(target_requester.user, room_id) - yield self.handlers.room_member_handler.update_membership( + yield self.room_member_handler.update_membership( requester=target_requester, target=target_requester.user, room_id=new_room_id, diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 817fd47842..9d745174c7 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -84,6 +84,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): super(RoomStateEventRestServlet, self).__init__(hs) self.handlers = hs.get_handlers() self.event_creation_hander = hs.get_event_creation_handler() + self.room_member_handler = hs.get_room_member_handler() def register(self, http_server): # /room/$roomid/state/$eventtype @@ -156,7 +157,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): if event_type == EventTypes.Member: membership = content.get("membership", None) - event = yield self.handlers.room_member_handler.update_membership( + event = yield self.room_member_handler.update_membership( requester, target=UserID.from_string(state_key), room_id=room_id, @@ -229,7 +230,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet): class JoinRoomAliasServlet(ClientV1RestServlet): def __init__(self, hs): super(JoinRoomAliasServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.room_member_handler = hs.get_room_member_handler() def register(self, http_server): # /join/$room_identifier[/$txn_id] @@ -257,7 +258,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet): except Exception: remote_room_hosts = None elif RoomAlias.is_valid(room_identifier): - handler = self.handlers.room_member_handler + handler = self.room_member_handler room_alias = RoomAlias.from_string(room_identifier) room_id, remote_room_hosts = yield handler.lookup_room_alias(room_alias) room_id = room_id.to_string() @@ -266,7 +267,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet): room_identifier, )) - yield self.handlers.room_member_handler.update_membership( + yield self.room_member_handler.update_membership( requester=requester, target=requester.user, room_id=room_id, @@ -562,7 +563,7 @@ class RoomEventContextServlet(ClientV1RestServlet): class RoomForgetRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomForgetRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.room_member_handler = hs.get_room_member_handler() def register(self, http_server): PATTERNS = ("/rooms/(?P[^/]*)/forget") @@ -575,7 +576,7 @@ class RoomForgetRestServlet(ClientV1RestServlet): allow_guest=False, ) - yield self.handlers.room_member_handler.forget( + yield self.room_member_handler.forget( user=requester.user, room_id=room_id, ) @@ -593,7 +594,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomMembershipRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.room_member_handler = hs.get_room_member_handler() def register(self, http_server): # /rooms/$roomid/[invite|join|leave] @@ -622,7 +623,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet): content = {} if membership_action == "invite" and self._has_3pid_invite_keys(content): - yield self.handlers.room_member_handler.do_3pid_invite( + yield self.room_member_handler.do_3pid_invite( room_id, requester.user, content["medium"], @@ -644,7 +645,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet): if 'reason' in content and membership_action in ['kick', 'ban']: event_content = {'reason': content['reason']} - yield self.handlers.room_member_handler.update_membership( + yield self.room_member_handler.update_membership( requester=requester, target=target, room_id=room_id, diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index c6f4680a76..0ba62bddc1 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -183,7 +183,7 @@ class RegisterRestServlet(RestServlet): self.auth_handler = hs.get_auth_handler() self.registration_handler = hs.get_handlers().registration_handler self.identity_handler = hs.get_handlers().identity_handler - self.room_member_handler = hs.get_handlers().room_member_handler + self.room_member_handler = hs.get_room_member_handler() self.device_handler = hs.get_device_handler() self.macaroon_gen = hs.get_macaroon_generator() diff --git a/synapse/server.py b/synapse/server.py index fbd602d40e..5b6effbe31 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -45,6 +45,7 @@ from synapse.handlers.device import DeviceHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.presence import PresenceHandler from synapse.handlers.room_list import RoomListHandler +from synapse.handlers.room_member import RoomMemberHandler from synapse.handlers.set_password import SetPasswordHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler @@ -145,6 +146,7 @@ class HomeServer(object): 'groups_attestation_signing', 'groups_attestation_renewer', 'spam_checker', + 'room_member_handler', ] def __init__(self, hostname, **kwargs): @@ -382,6 +384,9 @@ class HomeServer(object): def build_spam_checker(self): return SpamChecker(self) + def build_room_member_handler(self): + return RoomMemberHandler(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py index a269e6f56e..e46534cd35 100644 --- a/tests/rest/client/v1/test_typing.py +++ b/tests/rest/client/v1/test_typing.py @@ -95,7 +95,7 @@ class RoomTypingTestCase(RestTestCase): else: if remotedomains is not None: remotedomains.add(member.domain) - hs.get_handlers().room_member_handler.fetch_room_distributions_into = ( + hs.get_room_member_handler().fetch_room_distributions_into = ( fetch_room_distributions_into ) -- cgit 1.5.1 From f793bc38770caf81dc34b9033d7dd2c9bfc0d79b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 13:56:03 +0000 Subject: Split out stream store --- synapse/replication/slave/storage/events.py | 54 +---- synapse/storage/__init__.py | 8 - synapse/storage/stream.py | 350 +++++++++++++++------------- 3 files changed, 202 insertions(+), 210 deletions(-) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index de0b26f437..517a9f0ec6 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -22,9 +22,8 @@ from synapse.storage.event_push_actions import EventPushActionsWorkerStore from synapse.storage.events_worker import EventsWorkerStore from synapse.storage.roommember import RoomMemberWorkerStore from synapse.storage.state import StateGroupWorkerStore -from synapse.storage.stream import StreamStore +from synapse.storage.stream import StreamWorkerStore from synapse.storage.signatures import SignatureStore -from synapse.util.caches.stream_change_cache import StreamChangeCache from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker @@ -41,34 +40,20 @@ logger = logging.getLogger(__name__) class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore, - EventsWorkerStore, StateGroupWorkerStore, + StreamWorkerStore, EventsWorkerStore, StateGroupWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): - super(SlavedEventStore, self).__init__(db_conn, hs) self._stream_id_gen = SlavedIdTracker( db_conn, "events", "stream_ordering", ) self._backfill_id_gen = SlavedIdTracker( db_conn, "events", "stream_ordering", step=-1 ) - events_max = self._stream_id_gen.get_current_token() - event_cache_prefill, min_event_val = self._get_cache_dict( - db_conn, "events", - entity_column="room_id", - stream_column="stream_ordering", - max_value=events_max, - ) - self._events_stream_cache = StreamChangeCache( - "EventsRoomStreamChangeCache", min_event_val, - prefilled_cache=event_cache_prefill, - ) - self._membership_stream_cache = StreamChangeCache( - "MembershipStreamChangeCache", events_max, - ) + + super(SlavedEventStore, self).__init__(db_conn, hs) self.stream_ordering_month_ago = 0 - self._stream_order_on_start = self.get_room_max_stream_ordering() # Cached functions can't be accessed through a class instance so we need # to reach inside the __dict__ to extract them. @@ -76,30 +61,6 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore, "get_latest_event_ids_in_room" ] - get_recent_event_ids_for_room = ( - StreamStore.__dict__["get_recent_event_ids_for_room"] - ) - has_room_changed_since = DataStore.has_room_changed_since.__func__ - - get_membership_changes_for_user = ( - DataStore.get_membership_changes_for_user.__func__ - ) - get_room_events_max_id = DataStore.get_room_events_max_id.__func__ - get_room_events_stream_for_room = ( - DataStore.get_room_events_stream_for_room.__func__ - ) - get_events_around = DataStore.get_events_around.__func__ - - get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__ - get_room_events_stream_for_rooms = ( - DataStore.get_room_events_stream_for_rooms.__func__ - ) - get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__ - - _set_before_and_after = staticmethod(DataStore._set_before_and_after) - - _get_events_around_txn = DataStore._get_events_around_txn.__func__ - get_backfill_events = DataStore.get_backfill_events.__func__ _get_backfill_events = DataStore._get_backfill_events.__func__ get_missing_events = DataStore.get_missing_events.__func__ @@ -120,8 +81,11 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore, get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__ - get_federation_out_pos = DataStore.get_federation_out_pos.__func__ - update_federation_out_pos = DataStore.update_federation_out_pos.__func__ + def get_room_max_stream_ordering(self): + return self._stream_id_gen.get_current_token() + + def get_room_min_stream_ordering(self): + return self._backfill_id_gen.get_current_token() get_latest_event_ids_and_hashes_in_room = ( DataStore.get_latest_event_ids_and_hashes_in_room.__func__ diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 0f136f8a06..0ce76d7a8c 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -148,14 +148,6 @@ class DataStore(RoomMemberStore, RoomStore, stream_column="stream_ordering", max_value=events_max, ) - self._events_stream_cache = StreamChangeCache( - "EventsRoomStreamChangeCache", min_event_val, - prefilled_cache=event_cache_prefill, - ) - - self._membership_stream_cache = StreamChangeCache( - "MembershipStreamChangeCache", events_max, - ) self._presence_on_startup = self._get_active_presence(db_conn) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 52bdce5be2..057f30db33 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -35,13 +35,17 @@ what sort order was used: from twisted.internet import defer -from ._base import SQLBaseStore +from synapse.storage._base import SQLBaseStore +from synapse.storage.events import EventsWorkerStore + from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.storage.engines import PostgresEngine, Sqlite3Engine +import abc import logging @@ -143,81 +147,28 @@ def filter_to_clause(event_filter): return " AND ".join(clauses), args -class StreamStore(SQLBaseStore): - @defer.inlineCallbacks - def get_appservice_room_stream(self, service, from_key, to_key, limit=0): - # NB this lives here instead of appservice.py so we can reuse the - # 'private' StreamToken class in this file. - if limit: - limit = max(limit, MAX_STREAM_SIZE) - else: - limit = MAX_STREAM_SIZE +class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): + __metaclass__ = abc.ABCMeta - # From and to keys should be integers from ordering. - from_id = RoomStreamToken.parse_stream_token(from_key) - to_id = RoomStreamToken.parse_stream_token(to_key) - - if from_key == to_key: - defer.returnValue(([], to_key)) - return + def __init__(self, db_conn, hs): + super(StreamWorkerStore, self).__init__(db_conn, hs) - # select all the events between from/to with a sensible limit - sql = ( - "SELECT e.event_id, e.room_id, e.type, s.state_key, " - "e.stream_ordering FROM events AS e " - "LEFT JOIN state_events as s ON " - "e.event_id = s.event_id " - "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " - "ORDER BY stream_ordering ASC LIMIT %(limit)d " - ) % { - "limit": limit - } - - def f(txn): - # pull out all the events between the tokens - txn.execute(sql, (from_id.stream, to_id.stream,)) - rows = self.cursor_to_dict(txn) - - # Logic: - # - We want ALL events which match the AS room_id regex - # - We want ALL events which match the rooms represented by the AS - # room_alias regex - # - We want ALL events for rooms that AS users have joined. - # This is currently supported via get_app_service_rooms (which is - # used for the Notifier listener rooms). We can't reasonably make a - # SQL query for these room IDs, so we'll pull all the events between - # from/to and filter in python. - rooms_for_as = self._get_app_service_rooms_txn(txn, service) - room_ids_for_as = [r.room_id for r in rooms_for_as] - - def app_service_interested(row): - if row["room_id"] in room_ids_for_as: - return True - - if row["type"] == EventTypes.Member: - if service.is_interested_in_user(row.get("state_key")): - return True - return False - - return [r for r in rows if app_service_interested(r)] - - rows = yield self.runInteraction("get_appservice_room_stream", f) - - ret = yield self._get_events( - [r["event_id"] for r in rows], - get_prev_content=True + events_max = self.get_room_max_stream_ordering() + event_cache_prefill, min_event_val = self._get_cache_dict( + db_conn, "events", + entity_column="room_id", + stream_column="stream_ordering", + max_value=events_max, + ) + self._events_stream_cache = StreamChangeCache( + "EventsRoomStreamChangeCache", min_event_val, + prefilled_cache=event_cache_prefill, + ) + self._membership_stream_cache = StreamChangeCache( + "MembershipStreamChangeCache", events_max, ) - self._set_before_and_after(ret, rows, topo_order=from_id is None) - - if rows: - key = "s%d" % max(r["stream_ordering"] for r in rows) - else: - # Assume we didn't get anything because there was nothing to - # get. - key = to_key - - defer.returnValue((ret, key)) + self._stream_order_on_start = self.get_room_max_stream_ordering() @defer.inlineCallbacks def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0, @@ -380,88 +331,6 @@ class StreamStore(SQLBaseStore): defer.returnValue(ret) - @defer.inlineCallbacks - def paginate_room_events(self, room_id, from_key, to_key=None, - direction='b', limit=-1, event_filter=None): - # Tokens really represent positions between elements, but we use - # the convention of pointing to the event before the gap. Hence - # we have a bit of asymmetry when it comes to equalities. - args = [False, room_id] - if direction == 'b': - order = "DESC" - bounds = upper_bound( - RoomStreamToken.parse(from_key), self.database_engine - ) - if to_key: - bounds = "%s AND %s" % (bounds, lower_bound( - RoomStreamToken.parse(to_key), self.database_engine - )) - else: - order = "ASC" - bounds = lower_bound( - RoomStreamToken.parse(from_key), self.database_engine - ) - if to_key: - bounds = "%s AND %s" % (bounds, upper_bound( - RoomStreamToken.parse(to_key), self.database_engine - )) - - filter_clause, filter_args = filter_to_clause(event_filter) - - if filter_clause: - bounds += " AND " + filter_clause - args.extend(filter_args) - - if int(limit) > 0: - args.append(int(limit)) - limit_str = " LIMIT ?" - else: - limit_str = "" - - sql = ( - "SELECT * FROM events" - " WHERE outlier = ? AND room_id = ? AND %(bounds)s" - " ORDER BY topological_ordering %(order)s," - " stream_ordering %(order)s %(limit)s" - ) % { - "bounds": bounds, - "order": order, - "limit": limit_str - } - - def f(txn): - txn.execute(sql, args) - - rows = self.cursor_to_dict(txn) - - if rows: - topo = rows[-1]["topological_ordering"] - toke = rows[-1]["stream_ordering"] - if direction == 'b': - # Tokens are positions between events. - # This token points *after* the last event in the chunk. - # We need it to point to the event before it in the chunk - # when we are going backwards so we subtract one from the - # stream part. - toke -= 1 - next_token = str(RoomStreamToken(topo, toke)) - else: - # TODO (erikj): We should work out what to do here instead. - next_token = to_key if to_key else from_key - - return rows, next_token, - - rows, token = yield self.runInteraction("paginate_room_events", f) - - events = yield self._get_events( - [r["event_id"] for r in rows], - get_prev_content=True - ) - - self._set_before_and_after(events, rows) - - defer.returnValue((events, token)) - @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): rows, token = yield self.get_recent_event_ids_for_room( @@ -542,7 +411,7 @@ class StreamStore(SQLBaseStore): `room_id` causes it to return the current room specific topological token. """ - token = yield self._stream_id_gen.get_current_token() + token = yield self.get_room_max_stream_ordering() if room_id is None: defer.returnValue("s%d" % (token,)) else: @@ -552,11 +421,13 @@ class StreamStore(SQLBaseStore): ) defer.returnValue("t%d-%d" % (topo, token)) + @abc.abstractmethod def get_room_max_stream_ordering(self): - return self._stream_id_gen.get_current_token() + raise NotImplementedError() + @abc.abstractmethod def get_room_min_stream_ordering(self): - return self._backfill_id_gen.get_current_token() + raise NotImplementedError() def get_stream_token_for_event(self, event_id): """The stream token for an event @@ -832,3 +703,168 @@ class StreamStore(SQLBaseStore): def has_room_changed_since(self, room_id, stream_id): return self._events_stream_cache.has_entity_changed(room_id, stream_id) + + +class StreamStore(StreamWorkerStore): + def get_room_max_stream_ordering(self): + return self._stream_id_gen.get_current_token() + + def get_room_min_stream_ordering(self): + return self._backfill_id_gen.get_current_token() + + @defer.inlineCallbacks + def get_appservice_room_stream(self, service, from_key, to_key, limit=0): + # NB this lives here instead of appservice.py so we can reuse the + # 'private' StreamToken class in this file. + if limit: + limit = max(limit, MAX_STREAM_SIZE) + else: + limit = MAX_STREAM_SIZE + + # From and to keys should be integers from ordering. + from_id = RoomStreamToken.parse_stream_token(from_key) + to_id = RoomStreamToken.parse_stream_token(to_key) + + if from_key == to_key: + defer.returnValue(([], to_key)) + return + + # select all the events between from/to with a sensible limit + sql = ( + "SELECT e.event_id, e.room_id, e.type, s.state_key, " + "e.stream_ordering FROM events AS e " + "LEFT JOIN state_events as s ON " + "e.event_id = s.event_id " + "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " + "ORDER BY stream_ordering ASC LIMIT %(limit)d " + ) % { + "limit": limit + } + + def f(txn): + # pull out all the events between the tokens + txn.execute(sql, (from_id.stream, to_id.stream,)) + rows = self.cursor_to_dict(txn) + + # Logic: + # - We want ALL events which match the AS room_id regex + # - We want ALL events which match the rooms represented by the AS + # room_alias regex + # - We want ALL events for rooms that AS users have joined. + # This is currently supported via get_app_service_rooms (which is + # used for the Notifier listener rooms). We can't reasonably make a + # SQL query for these room IDs, so we'll pull all the events between + # from/to and filter in python. + rooms_for_as = self._get_app_service_rooms_txn(txn, service) + room_ids_for_as = [r.room_id for r in rooms_for_as] + + def app_service_interested(row): + if row["room_id"] in room_ids_for_as: + return True + + if row["type"] == EventTypes.Member: + if service.is_interested_in_user(row.get("state_key")): + return True + return False + + return [r for r in rows if app_service_interested(r)] + + rows = yield self.runInteraction("get_appservice_room_stream", f) + + ret = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) + + self._set_before_and_after(ret, rows, topo_order=from_id is None) + + if rows: + key = "s%d" % max(r["stream_ordering"] for r in rows) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = to_key + + defer.returnValue((ret, key)) + + @defer.inlineCallbacks + def paginate_room_events(self, room_id, from_key, to_key=None, + direction='b', limit=-1, event_filter=None): + # Tokens really represent positions between elements, but we use + # the convention of pointing to the event before the gap. Hence + # we have a bit of asymmetry when it comes to equalities. + args = [False, room_id] + if direction == 'b': + order = "DESC" + bounds = upper_bound( + RoomStreamToken.parse(from_key), self.database_engine + ) + if to_key: + bounds = "%s AND %s" % (bounds, lower_bound( + RoomStreamToken.parse(to_key), self.database_engine + )) + else: + order = "ASC" + bounds = lower_bound( + RoomStreamToken.parse(from_key), self.database_engine + ) + if to_key: + bounds = "%s AND %s" % (bounds, upper_bound( + RoomStreamToken.parse(to_key), self.database_engine + )) + + filter_clause, filter_args = filter_to_clause(event_filter) + + if filter_clause: + bounds += " AND " + filter_clause + args.extend(filter_args) + + if int(limit) > 0: + args.append(int(limit)) + limit_str = " LIMIT ?" + else: + limit_str = "" + + sql = ( + "SELECT * FROM events" + " WHERE outlier = ? AND room_id = ? AND %(bounds)s" + " ORDER BY topological_ordering %(order)s," + " stream_ordering %(order)s %(limit)s" + ) % { + "bounds": bounds, + "order": order, + "limit": limit_str + } + + def f(txn): + txn.execute(sql, args) + + rows = self.cursor_to_dict(txn) + + if rows: + topo = rows[-1]["topological_ordering"] + toke = rows[-1]["stream_ordering"] + if direction == 'b': + # Tokens are positions between events. + # This token points *after* the last event in the chunk. + # We need it to point to the event before it in the chunk + # when we are going backwards so we subtract one from the + # stream part. + toke -= 1 + next_token = str(RoomStreamToken(topo, toke)) + else: + # TODO (erikj): We should work out what to do here instead. + next_token = to_key if to_key else from_key + + return rows, next_token, + + rows, token = yield self.runInteraction("paginate_room_events", f) + + events = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) + + self._set_before_and_after(events, rows) + + defer.returnValue((events, token)) -- cgit 1.5.1 From 22004b524e5264afb0e883bee486f669ea58833c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 15:59:40 +0000 Subject: Fix comment typo --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index c08bebe112..848d8bd728 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -67,7 +67,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): def __init__(self, db_conn, hs): super(EventPushActionsWorkerStore, self).__init__(db_conn, hs) - # These get correctly ste by _find_stream_orderings_for_times_txn + # These get correctly set by _find_stream_orderings_for_times_txn self.stream_ordering_month_ago = 0 self.stream_ordering_day_ago = 0 -- cgit 1.5.1 From 872ff95ed49c9cb30ab5f256c5ff539430e658db Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 16:00:05 +0000 Subject: Default stream_ordering_*_ago to None --- synapse/storage/event_push_actions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 848d8bd728..7164293568 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -68,8 +68,8 @@ class EventPushActionsWorkerStore(SQLBaseStore): super(EventPushActionsWorkerStore, self).__init__(db_conn, hs) # These get correctly set by _find_stream_orderings_for_times_txn - self.stream_ordering_month_ago = 0 - self.stream_ordering_day_ago = 0 + self.stream_ordering_month_ago = None + self.stream_ordering_day_ago = None cur = LoggingTransaction( db_conn.cursor(), -- cgit 1.5.1 From 1b2af116502e6ebcd2ae24178754145e59ee7c24 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 16:20:57 +0000 Subject: Document abstract class and method better --- synapse/storage/stream.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 057f30db33..a2527d2a36 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -148,6 +148,11 @@ def filter_to_clause(event_filter): class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): + """This is an abstract base class where subclasses must implement + `get_room_max_stream_ordering` and `get_room_min_stream_ordering` + which can be called in the initializer. + """ + __metaclass__ = abc.ABCMeta def __init__(self, db_conn, hs): @@ -170,6 +175,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): self._stream_order_on_start = self.get_room_max_stream_ordering() + @abc.abstractmethod + def get_room_max_stream_ordering(self): + raise NotImplementedError() + + @abc.abstractmethod + def get_room_min_stream_ordering(self): + raise NotImplementedError() + @defer.inlineCallbacks def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0, order='DESC'): @@ -421,14 +434,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) defer.returnValue("t%d-%d" % (topo, token)) - @abc.abstractmethod - def get_room_max_stream_ordering(self): - raise NotImplementedError() - - @abc.abstractmethod - def get_room_min_stream_ordering(self): - raise NotImplementedError() - def get_stream_token_for_event(self, event_id): """The stream token for an event Args: -- cgit 1.5.1 From 884b26ae4150f19bd1e020c3eed934e978518a09 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 16:23:48 +0000 Subject: Remove unused variables --- synapse/storage/__init__.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 0ce76d7a8c..22c156c15b 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -141,14 +141,6 @@ class DataStore(RoomMemberStore, RoomStore, else: self._cache_id_gen = None - events_max = self._stream_id_gen.get_current_token() - event_cache_prefill, min_event_val = self._get_cache_dict( - db_conn, "events", - entity_column="room_id", - stream_column="stream_ordering", - max_value=events_max, - ) - self._presence_on_startup = self._get_active_presence(db_conn) presence_cache_prefill, min_presence_val = self._get_cache_dict( @@ -196,6 +188,7 @@ class DataStore(RoomMemberStore, RoomStore, "DeviceListFederationStreamChangeCache", device_list_max, ) + events_max = self._stream_id_gen.get_current_token() curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( db_conn, "current_state_delta_stream", entity_column="room_id", -- cgit 1.5.1 From 7c371834ccbdf33f7070981bf23cbf11d1c1c333 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 16:40:27 +0000 Subject: Stub out broken function only used for cache --- synapse/storage/signatures.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index e6eeb1b641..9e6eaaa532 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -25,7 +25,9 @@ from synapse.util.caches.descriptors import cached, cachedList class SignatureWorkerStore(SQLBaseStore): @cached() def get_event_reference_hash(self, event_id): - return self._get_event_reference_hashes_txn(event_id) + # This is a dummy function to allow get_event_reference_hashes + # to use its cache + raise NotImplementedError() @cachedList(cached_method_name="get_event_reference_hash", list_name="event_ids", num_args=1) -- cgit 1.5.1 From 529c026ac149705d0c9948fed71e3f0ca069b759 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 16:49:12 +0000 Subject: Move back to hs.is_mine --- synapse/handlers/room_member.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 0329432f5c..7ecdf87246 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -43,12 +43,11 @@ class RoomMemberHandler(object): # ought to be separated out a lot better. def __init__(self, hs): + self.hs = hs self.store = hs.get_datastore() self.auth = hs.get_auth() self.state_handler = hs.get_state_handler() self.config = hs.config - self.is_mine = hs.is_mine - self.is_mine_id = hs.is_mine_id self.simple_http_client = hs.get_simple_http_client() self.federation_handler = hs.get_handlers().federation_handler @@ -294,7 +293,7 @@ class RoomMemberHandler(object): if not is_host_in_room: inviter = yield self.get_inviter(target.to_string(), room_id) - if inviter and not self.is_mine(inviter): + if inviter and not self.hs.is_mine(inviter): remote_room_hosts.append(inviter.domain) content["membership"] = Membership.JOIN @@ -319,7 +318,7 @@ class RoomMemberHandler(object): if not inviter: raise SynapseError(404, "Not a known room") - if self.is_mine(inviter): + if self.hs.is_mine(inviter): # the inviter was on our server, but has now left. Carry on # with the normal rejection codepath. # @@ -401,7 +400,7 @@ class RoomMemberHandler(object): "Sender (%s) must be same as requester (%s)" % (sender, requester.user) ) - assert self.is_mine(sender), "Sender must be our own: %s" % (sender,) + assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,) else: requester = synapse.types.create_requester(target_user) @@ -801,10 +800,10 @@ class RoomMemberHandler(object): # first member event? create_event_id = current_state_ids.get(("m.room.create", "")) if len(current_state_ids) == 1 and create_event_id: - defer.returnValue(self.is_mine_id(create_event_id)) + defer.returnValue(self.hs.is_mine_id(create_event_id)) for etype, state_key in current_state_ids: - if etype != EventTypes.Member or not self.is_mine_id(state_key): + if etype != EventTypes.Member or not self.hs.is_mine_id(state_key): continue event_id = current_state_ids[(etype, state_key)] -- cgit 1.5.1 From 65cf454fd1dda2def0920973e90de3ff305462d4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 17:10:40 +0000 Subject: Remove unused DataStore --- synapse/replication/slave/storage/events.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index f0d70a6734..b1f64ef0d8 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -16,7 +16,6 @@ import logging from synapse.api.constants import EventTypes -from synapse.storage import DataStore from synapse.storage.event_federation import EventFederationWorkerStore from synapse.storage.event_push_actions import EventPushActionsWorkerStore from synapse.storage.events_worker import EventsWorkerStore -- cgit 1.5.1 From d960d23830cb8bffe49a1eafed21bf66c25e4235 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 2 Mar 2018 11:03:18 +0000 Subject: Add missing yield during 3pid signature checks --- synapse/handlers/room_member.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 7ecdf87246..ed3b97730d 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -573,7 +573,7 @@ class RoomMemberHandler(object): if "mxid" in data: if "signatures" not in data: raise AuthError(401, "No signatures on 3pid binding") - self.verify_any_signature(data, id_server) + yield self.verify_any_signature(data, id_server) defer.returnValue(data["mxid"]) except IOError as e: -- cgit 1.5.1 From fafa3e7114bdadd52f190892821ae61862ebfb2c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 18:19:34 +0000 Subject: Split registration store --- synapse/replication/slave/storage/registration.py | 18 +--- synapse/storage/registration.py | 118 +++++++++++----------- 2 files changed, 64 insertions(+), 72 deletions(-) diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py index e27c7332d2..7323bf0f1e 100644 --- a/synapse/replication/slave/storage/registration.py +++ b/synapse/replication/slave/storage/registration.py @@ -14,20 +14,8 @@ # limitations under the License. from ._base import BaseSlavedStore -from synapse.storage import DataStore -from synapse.storage.registration import RegistrationStore +from synapse.storage.registration import RegistrationWorkerStore -class SlavedRegistrationStore(BaseSlavedStore): - def __init__(self, db_conn, hs): - super(SlavedRegistrationStore, self).__init__(db_conn, hs) - - # TODO: use the cached version and invalidate deleted tokens - get_user_by_access_token = RegistrationStore.__dict__[ - "get_user_by_access_token" - ] - - _query_for_auth = DataStore._query_for_auth.__func__ - get_user_by_id = RegistrationStore.__dict__[ - "get_user_by_id" - ] +class SlavedRegistrationStore(RegistrationWorkerStore, BaseSlavedStore): + pass diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 95f75d6df1..d809b2ba46 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -19,10 +19,70 @@ from twisted.internet import defer from synapse.api.errors import StoreError, Codes from synapse.storage import background_updates +from synapse.storage._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks -class RegistrationStore(background_updates.BackgroundUpdateStore): +class RegistrationWorkerStore(SQLBaseStore): + @cached() + def get_user_by_id(self, user_id): + return self._simple_select_one( + table="users", + keyvalues={ + "name": user_id, + }, + retcols=["name", "password_hash", "is_guest"], + allow_none=True, + desc="get_user_by_id", + ) + + @cached() + def get_user_by_access_token(self, token): + """Get a user from the given access token. + + Args: + token (str): The access token of a user. + Returns: + defer.Deferred: None, if the token did not match, otherwise dict + including the keys `name`, `is_guest`, `device_id`, `token_id`. + """ + return self.runInteraction( + "get_user_by_access_token", + self._query_for_auth, + token + ) + + @defer.inlineCallbacks + def is_server_admin(self, user): + res = yield self._simple_select_one_onecol( + table="users", + keyvalues={"name": user.to_string()}, + retcol="admin", + allow_none=True, + desc="is_server_admin", + ) + + defer.returnValue(res if res else False) + + def _query_for_auth(self, txn, token): + sql = ( + "SELECT users.name, users.is_guest, access_tokens.id as token_id," + " access_tokens.device_id" + " FROM users" + " INNER JOIN access_tokens on users.name = access_tokens.user_id" + " WHERE token = ?" + ) + + txn.execute(sql, (token,)) + rows = self.cursor_to_dict(txn) + if rows: + return rows[0] + + return None + + +class RegistrationStore(RegistrationWorkerStore, + background_updates.BackgroundUpdateStore): def __init__(self, db_conn, hs): super(RegistrationStore, self).__init__(db_conn, hs) @@ -187,18 +247,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): ) txn.call_after(self.is_guest.invalidate, (user_id,)) - @cached() - def get_user_by_id(self, user_id): - return self._simple_select_one( - table="users", - keyvalues={ - "name": user_id, - }, - retcols=["name", "password_hash", "is_guest"], - allow_none=True, - desc="get_user_by_id", - ) - def get_users_by_id_case_insensitive(self, user_id): """Gets users that match user_id case insensitively. Returns a mapping of user_id -> password_hash. @@ -304,34 +352,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): return self.runInteraction("delete_access_token", f) - @cached() - def get_user_by_access_token(self, token): - """Get a user from the given access token. - - Args: - token (str): The access token of a user. - Returns: - defer.Deferred: None, if the token did not match, otherwise dict - including the keys `name`, `is_guest`, `device_id`, `token_id`. - """ - return self.runInteraction( - "get_user_by_access_token", - self._query_for_auth, - token - ) - - @defer.inlineCallbacks - def is_server_admin(self, user): - res = yield self._simple_select_one_onecol( - table="users", - keyvalues={"name": user.to_string()}, - retcol="admin", - allow_none=True, - desc="is_server_admin", - ) - - defer.returnValue(res if res else False) - @cachedInlineCallbacks() def is_guest(self, user_id): res = yield self._simple_select_one_onecol( @@ -344,22 +364,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): defer.returnValue(res if res else False) - def _query_for_auth(self, txn, token): - sql = ( - "SELECT users.name, users.is_guest, access_tokens.id as token_id," - " access_tokens.device_id" - " FROM users" - " INNER JOIN access_tokens on users.name = access_tokens.user_id" - " WHERE token = ?" - ) - - txn.execute(sql, (token,)) - rows = self.cursor_to_dict(txn) - if rows: - return rows[0] - - return None - @defer.inlineCallbacks def user_add_threepid(self, user_id, medium, address, validated_at, added_at): yield self._simple_upsert("user_threepids", { -- cgit 1.5.1 From efb79820b4924e13b2c7d1145cf891fd5d441c2a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 2 Mar 2018 14:43:29 +0000 Subject: Fix bug with delayed cache invalidation stream We poked the notifier before updated the current token for the cache invalidation stream. This mean that sometimes the update wouldn't be sent until the next time a cache was invalidated. --- synapse/storage/_base.py | 26 ++++++++++++++------------ synapse/storage/event_push_actions.py | 2 +- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 68125006eb..2fbebd4907 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -48,16 +48,16 @@ class LoggingTransaction(object): passed to the constructor. Adds logging and metrics to the .execute() method.""" __slots__ = [ - "txn", "name", "database_engine", "after_callbacks", "final_callbacks", + "txn", "name", "database_engine", "after_callbacks", "exception_callbacks", ] def __init__(self, txn, name, database_engine, after_callbacks, - final_callbacks): + exception_callbacks): object.__setattr__(self, "txn", txn) object.__setattr__(self, "name", name) object.__setattr__(self, "database_engine", database_engine) object.__setattr__(self, "after_callbacks", after_callbacks) - object.__setattr__(self, "final_callbacks", final_callbacks) + object.__setattr__(self, "exception_callbacks", exception_callbacks) def call_after(self, callback, *args, **kwargs): """Call the given callback on the main twisted thread after the @@ -66,8 +66,8 @@ class LoggingTransaction(object): """ self.after_callbacks.append((callback, args, kwargs)) - def call_finally(self, callback, *args, **kwargs): - self.final_callbacks.append((callback, args, kwargs)) + def call_on_exception(self, callback, *args, **kwargs): + self.exception_callbacks.append((callback, args, kwargs)) def __getattr__(self, name): return getattr(self.txn, name) @@ -215,7 +215,7 @@ class SQLBaseStore(object): self._clock.looping_call(loop, 10000) - def _new_transaction(self, conn, desc, after_callbacks, final_callbacks, + def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks, logging_context, func, *args, **kwargs): start = time.time() * 1000 txn_id = self._TXN_ID @@ -236,7 +236,7 @@ class SQLBaseStore(object): txn = conn.cursor() txn = LoggingTransaction( txn, name, self.database_engine, after_callbacks, - final_callbacks, + exception_callbacks, ) r = func(txn, *args, **kwargs) conn.commit() @@ -308,11 +308,11 @@ class SQLBaseStore(object): current_context = LoggingContext.current_context() after_callbacks = [] - final_callbacks = [] + exception_callbacks = [] def inner_func(conn, *args, **kwargs): return self._new_transaction( - conn, desc, after_callbacks, final_callbacks, current_context, + conn, desc, after_callbacks, exception_callbacks, current_context, func, *args, **kwargs ) @@ -321,9 +321,10 @@ class SQLBaseStore(object): for after_callback, after_args, after_kwargs in after_callbacks: after_callback(*after_args, **after_kwargs) - finally: - for after_callback, after_args, after_kwargs in final_callbacks: + except: # noqa: E722, as we reraise the exception this is fine. + for after_callback, after_args, after_kwargs in exception_callbacks: after_callback(*after_args, **after_kwargs) + raise defer.returnValue(result) @@ -1000,7 +1001,8 @@ class SQLBaseStore(object): # __exit__ called after the transaction finishes. ctx = self._cache_id_gen.get_next() stream_id = ctx.__enter__() - txn.call_finally(ctx.__exit__, None, None, None) + txn.call_on_exception(ctx.__exit__, None, None, None) + txn.call_after(ctx.__exit__, None, None, None) txn.call_after(self.hs.get_notifier().on_new_replication_data) self._simple_insert_txn( diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 7164293568..912e8db1d3 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -76,7 +76,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): name="_find_stream_orderings_for_times_txn", database_engine=self.database_engine, after_callbacks=[], - final_callbacks=[], + exception_callbacks=[], ) self._find_stream_orderings_for_times_txn(cur) cur.close() -- cgit 1.5.1 From 06a14876e5d78891a5b3ae6c0f454faefd696e79 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 5 Mar 2018 11:53:39 +0000 Subject: Add find_first_stream_ordering_after_ts Expose this as a public function which can be called outside a txn --- synapse/storage/event_push_actions.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 7164293568..54b54dcc6a 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -489,6 +489,27 @@ class EventPushActionsWorkerStore(SQLBaseStore): self.stream_ordering_day_ago ) + def find_first_stream_ordering_after_ts(self, ts): + """Gets the stream ordering corresponding to a given timestamp. + + Specifically, finds the stream_ordering of the first event that was + received after the timestamp. This is done by a binary search on the + events table, since there is no index on received_ts, so is + relatively slow. + + Args: + ts (int): timestamp in millis + + Returns: + Deferred[int]: stream ordering of the first event received after + the timestamp + """ + return self.runInteraction( + "_find_first_stream_ordering_after_ts_txn", + self._find_first_stream_ordering_after_ts_txn, + ts, + ) + def _find_first_stream_ordering_after_ts_txn(self, txn, ts): """ Find the stream_ordering of the first event that was received after -- cgit 1.5.1 From c818fcab1152439d79b03494816775b9ee3db613 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 5 Mar 2018 11:47:48 +0000 Subject: Test and fix find_first_stream_ordering_after_ts It seemed to suffer from a bunch of off-by-one errors. --- synapse/storage/event_push_actions.py | 68 +++++++++++++++++++++++++------- tests/storage/test_event_push_actions.py | 67 +++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 15 deletions(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 54b54dcc6a..a8c303e11e 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -493,15 +493,15 @@ class EventPushActionsWorkerStore(SQLBaseStore): """Gets the stream ordering corresponding to a given timestamp. Specifically, finds the stream_ordering of the first event that was - received after the timestamp. This is done by a binary search on the - events table, since there is no index on received_ts, so is + received on or after the timestamp. This is done by a binary search on + the events table, since there is no index on received_ts, so is relatively slow. Args: ts (int): timestamp in millis Returns: - Deferred[int]: stream ordering of the first event received after + Deferred[int]: stream ordering of the first event received on/after the timestamp """ return self.runInteraction( @@ -510,16 +510,24 @@ class EventPushActionsWorkerStore(SQLBaseStore): ts, ) - def _find_first_stream_ordering_after_ts_txn(self, txn, ts): + @staticmethod + def _find_first_stream_ordering_after_ts_txn(txn, ts): """ - Find the stream_ordering of the first event that was received after - a given timestamp. This is relatively slow as there is no index on - received_ts but we can then use this to delete push actions before + Find the stream_ordering of the first event that was received on or + after a given timestamp. This is relatively slow as there is no index + on received_ts but we can then use this to delete push actions before this. received_ts must necessarily be in the same order as stream_ordering and stream_ordering is indexed, so we manually binary search using stream_ordering + + Args: + txn (twisted.enterprise.adbapi.Transaction): + ts (int): timestamp to search for + + Returns: + int: stream ordering """ txn.execute("SELECT MAX(stream_ordering) FROM events") max_stream_ordering = txn.fetchone()[0] @@ -527,23 +535,53 @@ class EventPushActionsWorkerStore(SQLBaseStore): if max_stream_ordering is None: return 0 + # We want the first stream_ordering in which received_ts is greater + # than or equal to ts. Call this point X. + # + # We maintain the invariants: + # + # range_start <= X <= range_end + # range_start = 0 - range_end = max_stream_ordering - + range_end = max_stream_ordering + 1 + + # Given a stream_ordering, look up the timestamp at that + # stream_ordering. + # + # The array may be sparse (we may be missing some stream_orderings). + # We treat the gaps as the same as having the same value as the + # preceding entry, because we will pick the lowest stream_ordering + # which satisfies our requirement of received_ts >= ts. + # + # For example, if our array of events indexed by stream_ordering is + # [10, , 20], we should treat this as being equivalent to + # [10, 10, 20]. + # sql = ( "SELECT received_ts FROM events" - " WHERE stream_ordering > ?" - " ORDER BY stream_ordering" + " WHERE stream_ordering <= ?" + " ORDER BY stream_ordering DESC" " LIMIT 1" ) - while range_end - range_start > 1: - middle = int((range_end + range_start) / 2) + while range_end - range_start > 0: + middle = (range_end + range_start) // 2 txn.execute(sql, (middle,)) - middle_ts = txn.fetchone()[0] + row = txn.fetchone() + if row is None: + # no rows with stream_ordering<=middle + range_start = middle + 1 + continue + + middle_ts = row[0] if ts > middle_ts: - range_start = middle + # we got a timestamp lower than the one we were looking for. + # definitely need to look higher: X > middle. + range_start = middle + 1 else: + # we got a timestamp higher than (or the same as) the one we + # were looking for. We aren't yet sure about the point we + # looked up, but we can be sure that X <= middle. range_end = middle return range_end diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index 6c1aad149b..dbaaa12e23 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -127,3 +127,70 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): yield _assert_counts(1, 1) yield _rotate(10) yield _assert_counts(1, 1) + + @tests.unittest.DEBUG + @defer.inlineCallbacks + def test_find_first_stream_ordering_after_ts(self): + def add_event(so, ts): + return self.store._simple_insert("events", { + "stream_ordering": so, + "received_ts": ts, + "event_id": "event%i" % so, + "type": "", + "room_id": "", + "content": "", + "processed": True, + "outlier": False, + "topological_ordering": 0, + "depth": 0, + }) + + # start with the base case where there are no events in the table + r = yield self.store.find_first_stream_ordering_after_ts(11) + self.assertEqual(r, 0) + + # now with one event + yield add_event(2, 10) + r = yield self.store.find_first_stream_ordering_after_ts(9) + self.assertEqual(r, 2) + r = yield self.store.find_first_stream_ordering_after_ts(10) + self.assertEqual(r, 2) + r = yield self.store.find_first_stream_ordering_after_ts(11) + self.assertEqual(r, 3) + + # add a bunch of dummy events to the events table + for (stream_ordering, ts) in ( + (3, 110), + (4, 120), + (5, 120), + (10, 130), + (20, 140), + ): + yield add_event(stream_ordering, ts) + + r = yield self.store.find_first_stream_ordering_after_ts(110) + self.assertEqual(r, 3, + "First event after 110ms should be 3, was %i" % r) + + # 4 and 5 are both after 12: we want 4 rather than 5 + r = yield self.store.find_first_stream_ordering_after_ts(120) + self.assertEqual(r, 4, + "First event after 120ms should be 4, was %i" % r) + + r = yield self.store.find_first_stream_ordering_after_ts(129) + self.assertEqual(r, 10, + "First event after 129ms should be 10, was %i" % r) + + # check we can get the last event + r = yield self.store.find_first_stream_ordering_after_ts(140) + self.assertEqual(r, 20, + "First event after 14ms should be 20, was %i" % r) + + # off the end + r = yield self.store.find_first_stream_ordering_after_ts(160) + self.assertEqual(r, 21) + + # check we can find an event at ordering zero + yield add_event(0, 5) + r = yield self.store.find_first_stream_ordering_after_ts(1) + self.assertEqual(r, 0) -- cgit 1.5.1 From 2c911d75e820e321c3e9c885f74ffeabd666b60f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 5 Mar 2018 12:24:49 +0000 Subject: Fix comment typo --- tests/storage/test_event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index dbaaa12e23..575374c6a6 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -172,7 +172,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): self.assertEqual(r, 3, "First event after 110ms should be 3, was %i" % r) - # 4 and 5 are both after 12: we want 4 rather than 5 + # 4 and 5 are both after 120: we want 4 rather than 5 r = yield self.store.find_first_stream_ordering_after_ts(120) self.assertEqual(r, 4, "First event after 120ms should be 4, was %i" % r) -- cgit 1.5.1 From f8bfcd7e0d2fc6399eb654a41773cd603b4037fc Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 1 Mar 2018 23:20:54 +0000 Subject: Provide a means to pass a timestamp to purge_history --- docs/admin_api/purge_history_api.rst | 11 +++++-- synapse/handlers/message.py | 14 ++++----- synapse/rest/client/v1/admin.py | 58 ++++++++++++++++++++++++++++++++++-- synapse/storage/stream.py | 27 +++++++++++++++++ 4 files changed, 96 insertions(+), 14 deletions(-) diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst index a3a17e9f9f..acf1bc5749 100644 --- a/docs/admin_api/purge_history_api.rst +++ b/docs/admin_api/purge_history_api.rst @@ -8,9 +8,9 @@ Depending on the amount of history being purged a call to the API may take several minutes or longer. During this period users will not be able to paginate further back in the room from the point being purged from. -The API is simply: +The API is: -``POST /_matrix/client/r0/admin/purge_history//`` +``POST /_matrix/client/r0/admin/purge_history/[/]`` including an ``access_token`` of a server admin. @@ -25,3 +25,10 @@ To delete local events as well, set ``delete_local_events`` in the body: { "delete_local_events": true } + +The caller must specify the point in the room to purge up to. This can be +specified by including an event_id in the URI, or by setting a +``purge_up_to_event_id`` or ``purge_up_to_ts`` in the request body. If an event +id is given, that event (and others at the same graph depth) will be retained. +If ``purge_up_to_ts`` is given, it should be a timestamp since the unix epoch, +in milliseconds. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7d28c2745c..dd00d8a86c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -52,16 +52,12 @@ class MessageHandler(BaseHandler): self.pagination_lock = ReadWriteLock() @defer.inlineCallbacks - def purge_history(self, room_id, event_id, delete_local_events=False): - event = yield self.store.get_event(event_id) - - if event.room_id != room_id: - raise SynapseError(400, "Event is for wrong room.") - - depth = event.depth - + def purge_history(self, room_id, topological_ordering, + delete_local_events=False): with (yield self.pagination_lock.write(room_id)): - yield self.store.purge_history(room_id, depth, delete_local_events) + yield self.store.purge_history( + room_id, topological_ordering, delete_local_events, + ) @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 3917eee42d..dcf6215dad 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import Membership -from synapse.api.errors import AuthError, SynapseError +from synapse.api.errors import AuthError, SynapseError, Codes from synapse.types import UserID, create_requester from synapse.http.servlet import parse_json_object_from_request @@ -114,12 +114,18 @@ class PurgeMediaCacheRestServlet(ClientV1RestServlet): class PurgeHistoryRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns( - "/admin/purge_history/(?P[^/]*)/(?P[^/]*)" + "/admin/purge_history/(?P[^/]*)(/(?P[^/]+))?" ) def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer) + """ super(PurgeHistoryRestServlet, self).__init__(hs) self.handlers = hs.get_handlers() + self.store = hs.get_datastore() @defer.inlineCallbacks def on_POST(self, request, room_id, event_id): @@ -133,8 +139,54 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): delete_local_events = bool(body.get("delete_local_events", False)) + # establish the topological ordering we should keep events from. The + # user can provide an event_id in the URL or the request body, or can + # provide a timestamp in the request body. + if event_id is None: + event_id = body.get('purge_up_to_event_id') + + if event_id is not None: + event = yield self.store.get_event(event_id) + + if event.room_id != room_id: + raise SynapseError(400, "Event is for wrong room.") + + depth = event.depth + logger.info( + "[purge] purging up to depth %i (event_id %s)", + depth, event_id, + ) + elif 'purge_up_to_ts' in body: + ts = body['purge_up_to_ts'] + if not isinstance(ts, int): + raise SynapseError( + 400, "purge_up_to_ts must be an int", + errcode=Codes.BAD_JSON, + ) + + stream_ordering = ( + yield self.store.find_first_stream_ordering_after_ts(ts) + ) + + (_, depth, _) = ( + yield self.store.get_room_event_after_stream_ordering( + room_id, stream_ordering, + ) + ) + logger.info( + "[purge] purging up to depth %i (received_ts %i => " + "stream_ordering %i)", + depth, ts, stream_ordering, + ) + else: + raise SynapseError( + 400, + "must specify purge_up_to_event_id or purge_up_to_ts", + errcode=Codes.BAD_JSON, + ) + yield self.handlers.message_handler.purge_history( - room_id, event_id, + room_id, depth, delete_local_events=delete_local_events, ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index a2527d2a36..515a04699a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -416,6 +416,33 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) + def get_room_event_after_stream_ordering(self, room_id, stream_ordering): + """Gets details of the first event in a room at or after a stream ordering + + Args: + room_id (str): + stream_ordering (int): + + Returns: + Deferred[(int, int, str)]: + (stream ordering, topological ordering, event_id) + """ + def _f(txn): + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering >= ?" + " AND NOT outlier" + " ORDER BY stream_ordering" + " LIMIT 1" + ) + txn.execute(sql, (room_id, stream_ordering, )) + return txn.fetchone() + + return self.runInteraction( + "get_room_event_after_stream_ordering", _f, + ) + @defer.inlineCallbacks def get_room_events_max_id(self, room_id=None): """Returns the current token for rooms stream. -- cgit 1.5.1 From 2e223163ff59fa7d6030654dd8c74e58f8aa1deb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Mar 2018 15:11:30 +0000 Subject: Split Directory store --- synapse/replication/slave/storage/directory.py | 8 ++--- synapse/storage/directory.py | 45 +++++++++++++------------- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py index 7301d885f2..6deecd3963 100644 --- a/synapse/replication/slave/storage/directory.py +++ b/synapse/replication/slave/storage/directory.py @@ -14,10 +14,8 @@ # limitations under the License. from ._base import BaseSlavedStore -from synapse.storage.directory import DirectoryStore +from synapse.storage.directory import DirectoryWorkerStore -class DirectoryStore(BaseSlavedStore): - get_aliases_for_room = DirectoryStore.__dict__[ - "get_aliases_for_room" - ] +class DirectoryStore(DirectoryWorkerStore, BaseSlavedStore): + pass diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 79e7c540ad..041b0b457e 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -29,8 +29,7 @@ RoomAliasMapping = namedtuple( ) -class DirectoryStore(SQLBaseStore): - +class DirectoryWorkerStore(SQLBaseStore): @defer.inlineCallbacks def get_association_from_room_alias(self, room_alias): """ Get's the room_id and server list for a given room_alias @@ -69,6 +68,28 @@ class DirectoryStore(SQLBaseStore): RoomAliasMapping(room_id, room_alias.to_string(), servers) ) + def get_room_alias_creator(self, room_alias): + return self._simple_select_one_onecol( + table="room_aliases", + keyvalues={ + "room_alias": room_alias, + }, + retcol="creator", + desc="get_room_alias_creator", + allow_none=True + ) + + @cached(max_entries=5000) + def get_aliases_for_room(self, room_id): + return self._simple_select_onecol( + "room_aliases", + {"room_id": room_id}, + "room_alias", + desc="get_aliases_for_room", + ) + + +class DirectoryStore(DirectoryWorkerStore): @defer.inlineCallbacks def create_room_alias_association(self, room_alias, room_id, servers, creator=None): """ Creates an associatin between a room alias and room_id/servers @@ -116,17 +137,6 @@ class DirectoryStore(SQLBaseStore): ) defer.returnValue(ret) - def get_room_alias_creator(self, room_alias): - return self._simple_select_one_onecol( - table="room_aliases", - keyvalues={ - "room_alias": room_alias, - }, - retcol="creator", - desc="get_room_alias_creator", - allow_none=True - ) - @defer.inlineCallbacks def delete_room_alias(self, room_alias): room_id = yield self.runInteraction( @@ -162,15 +172,6 @@ class DirectoryStore(SQLBaseStore): return room_id - @cached(max_entries=5000) - def get_aliases_for_room(self, room_id): - return self._simple_select_onecol( - "room_aliases", - {"room_id": room_id}, - "room_alias", - desc="get_aliases_for_room", - ) - def update_aliases_for_room(self, old_room_id, new_room_id, creator): def _update_aliases_for_room_txn(txn): sql = "UPDATE room_aliases SET room_id = ?, creator = ? WHERE room_id = ?" -- cgit 1.5.1 From 69ce365b793ac35c8e9ee6c3b3a0d04d0e61db4f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Mar 2018 15:12:22 +0000 Subject: Fix cache invalidation on deletion --- synapse/storage/directory.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 041b0b457e..d0c0059757 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -145,7 +145,6 @@ class DirectoryStore(DirectoryWorkerStore): room_alias, ) - self.get_aliases_for_room.invalidate((room_id,)) defer.returnValue(room_id) def _delete_room_alias_txn(self, txn, room_alias): @@ -170,6 +169,10 @@ class DirectoryStore(DirectoryWorkerStore): (room_alias.to_string(),) ) + self._invalidate_cache_and_stream( + txn, self.get_aliases_for_room, (room_id,) + ) + return room_id def update_aliases_for_room(self, old_room_id, new_room_id, creator): -- cgit 1.5.1 From d4ffe61d4fb71953bff0f94ff5d1603afe7d46f7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Mar 2018 15:42:57 +0000 Subject: Remove ability for AS users to call /events and /sync This functionality has been deprecated for a while as well as being broken for a while. Instead of fixing it lets just remove it entirely. See: https://github.com/matrix-org/matrix-doc/issues/1144 --- synapse/handlers/room.py | 9 ++--- synapse/handlers/sync.py | 10 +++--- synapse/storage/appservice.py | 82 +++---------------------------------------- synapse/storage/stream.py | 76 --------------------------------------- 4 files changed, 14 insertions(+), 163 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6c425828c1..8df8fcbbad 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -475,12 +475,9 @@ class RoomEventSource(object): user.to_string() ) if app_service: - events, end_key = yield self.store.get_appservice_room_stream( - service=app_service, - from_key=from_key, - to_key=to_key, - limit=limit, - ) + # We no longer support AS users using /sync directly. + # See https://github.com/matrix-org/matrix-doc/issues/1144 + raise NotImplementedError() else: room_events = yield self.store.get_membership_changes_for_user( user.to_string(), from_key, to_key diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b12988f3c9..56b86356f2 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -998,8 +998,9 @@ class SyncHandler(object): app_service = self.store.get_app_service_by_user_id(user_id) if app_service: - rooms = yield self.store.get_app_service_rooms(app_service) - joined_room_ids = set(r.room_id for r in rooms) + # We no longer support AS users using /sync directly. + # See https://github.com/matrix-org/matrix-doc/issues/1144 + raise NotImplementedError() else: joined_room_ids = yield self.store.get_rooms_for_user(user_id) @@ -1030,8 +1031,9 @@ class SyncHandler(object): app_service = self.store.get_app_service_by_user_id(user_id) if app_service: - rooms = yield self.store.get_app_service_rooms(app_service) - joined_room_ids = set(r.room_id for r in rooms) + # We no longer support AS users using /sync directly. + # See https://github.com/matrix-org/matrix-doc/issues/1144 + raise NotImplementedError() else: joined_room_ids = yield self.store.get_rooms_for_user(user_id) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 90fb51d43c..12ea8a158c 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -18,11 +18,9 @@ import re import simplejson as json from twisted.internet import defer -from synapse.api.constants import Membership from synapse.appservice import AppServiceTransaction from synapse.config.appservice import load_appservices from synapse.storage.events import EventsWorkerStore -from synapse.storage.roommember import RoomsForUser from ._base import SQLBaseStore @@ -115,81 +113,11 @@ class ApplicationServiceWorkerStore(SQLBaseStore): class ApplicationServiceStore(ApplicationServiceWorkerStore): - - def __init__(self, db_conn, hs): - super(ApplicationServiceStore, self).__init__(db_conn, hs) - self.hostname = hs.hostname - - def get_app_service_rooms(self, service): - """Get a list of RoomsForUser for this application service. - - Application services may be "interested" in lots of rooms depending on - the room ID, the room aliases, or the members in the room. This function - takes all of these into account and returns a list of RoomsForUser which - represent the entire list of room IDs that this application service - wants to know about. - - Args: - service: The application service to get a room list for. - Returns: - A list of RoomsForUser. - """ - return self.runInteraction( - "get_app_service_rooms", - self._get_app_service_rooms_txn, - service, - ) - - def _get_app_service_rooms_txn(self, txn, service): - # get all rooms matching the room ID regex. - room_entries = self._simple_select_list_txn( - txn=txn, table="rooms", keyvalues=None, retcols=["room_id"] - ) - matching_room_list = set([ - r["room_id"] for r in room_entries if - service.is_interested_in_room(r["room_id"]) - ]) - - # resolve room IDs for matching room alias regex. - room_alias_mappings = self._simple_select_list_txn( - txn=txn, table="room_aliases", keyvalues=None, - retcols=["room_id", "room_alias"] - ) - matching_room_list |= set([ - r["room_id"] for r in room_alias_mappings if - service.is_interested_in_alias(r["room_alias"]) - ]) - - # get all rooms for every user for this AS. This is scoped to users on - # this HS only. - user_list = self._simple_select_list_txn( - txn=txn, table="users", keyvalues=None, retcols=["name"] - ) - user_list = [ - u["name"] for u in user_list if - service.is_interested_in_user(u["name"]) - ] - rooms_for_user_matching_user_id = set() # RoomsForUser list - for user_id in user_list: - # FIXME: This assumes this store is linked with RoomMemberStore :( - rooms_for_user = self._get_rooms_for_user_where_membership_is_txn( - txn=txn, - user_id=user_id, - membership_list=[Membership.JOIN] - ) - rooms_for_user_matching_user_id |= set(rooms_for_user) - - # make RoomsForUser tuples for room ids and aliases which are not in the - # main rooms_for_user_list - e.g. they are rooms which do not have AS - # registered users in it. - known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id] - missing_rooms_for_user = [ - RoomsForUser(r, service.sender, "join") for r in - matching_room_list if r not in known_room_ids - ] - rooms_for_user_matching_user_id |= set(missing_rooms_for_user) - - return rooms_for_user_matching_user_id + # This is currently empty due to there not being any AS storage functions + # that can't be run on the workers. Since this may change in future, and + # to keep consistency with the other stores, we keep this empty class for + # now. + pass class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index a2527d2a36..b78151cd82 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,7 +39,6 @@ from synapse.storage._base import SQLBaseStore from synapse.storage.events import EventsWorkerStore from synapse.util.caches.descriptors import cached -from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.logcontext import make_deferred_yieldable, preserve_fn @@ -717,81 +716,6 @@ class StreamStore(StreamWorkerStore): def get_room_min_stream_ordering(self): return self._backfill_id_gen.get_current_token() - @defer.inlineCallbacks - def get_appservice_room_stream(self, service, from_key, to_key, limit=0): - # NB this lives here instead of appservice.py so we can reuse the - # 'private' StreamToken class in this file. - if limit: - limit = max(limit, MAX_STREAM_SIZE) - else: - limit = MAX_STREAM_SIZE - - # From and to keys should be integers from ordering. - from_id = RoomStreamToken.parse_stream_token(from_key) - to_id = RoomStreamToken.parse_stream_token(to_key) - - if from_key == to_key: - defer.returnValue(([], to_key)) - return - - # select all the events between from/to with a sensible limit - sql = ( - "SELECT e.event_id, e.room_id, e.type, s.state_key, " - "e.stream_ordering FROM events AS e " - "LEFT JOIN state_events as s ON " - "e.event_id = s.event_id " - "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " - "ORDER BY stream_ordering ASC LIMIT %(limit)d " - ) % { - "limit": limit - } - - def f(txn): - # pull out all the events between the tokens - txn.execute(sql, (from_id.stream, to_id.stream,)) - rows = self.cursor_to_dict(txn) - - # Logic: - # - We want ALL events which match the AS room_id regex - # - We want ALL events which match the rooms represented by the AS - # room_alias regex - # - We want ALL events for rooms that AS users have joined. - # This is currently supported via get_app_service_rooms (which is - # used for the Notifier listener rooms). We can't reasonably make a - # SQL query for these room IDs, so we'll pull all the events between - # from/to and filter in python. - rooms_for_as = self._get_app_service_rooms_txn(txn, service) - room_ids_for_as = [r.room_id for r in rooms_for_as] - - def app_service_interested(row): - if row["room_id"] in room_ids_for_as: - return True - - if row["type"] == EventTypes.Member: - if service.is_interested_in_user(row.get("state_key")): - return True - return False - - return [r for r in rows if app_service_interested(r)] - - rows = yield self.runInteraction("get_appservice_room_stream", f) - - ret = yield self._get_events( - [r["event_id"] for r in rows], - get_prev_content=True - ) - - self._set_before_and_after(ret, rows, topo_order=from_id is None) - - if rows: - key = "s%d" % max(r["stream_ordering"] for r in rows) - else: - # Assume we didn't get anything because there was nothing to - # get. - key = to_key - - defer.returnValue((ret, key)) - @defer.inlineCallbacks def paginate_room_events(self, room_id, from_key, to_key=None, direction='b', limit=-1, event_filter=None): -- cgit 1.5.1 From ed9b5eced4f17dfb0a92167a6281e13054821d6f Mon Sep 17 00:00:00 2001 From: Krombel Date: Mon, 5 Mar 2018 17:51:09 +0100 Subject: use bcrypt.checkpw in bcrypt 3.1.0 checkpw got introduced (already 2 years ago) This makes use of that with enhancements which might get introduced by that Signed-Off-by: Matthias Kesler --- synapse/handlers/auth.py | 6 ++++-- synapse/python_dependencies.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 258cc345dc..a5365c4fe4 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -863,8 +863,10 @@ class AuthHandler(BaseHandler): """ def _do_validate_hash(): - return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper, - stored_hash.encode('utf8')) == stored_hash + return bcrypt.checkpw( + password.encode('utf8') + self.hs.config.password_pepper, + stored_hash.encode('utf8') + ) if stored_hash: return make_deferred_yieldable(threads.deferToThread(_do_validate_hash)) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 5d65b5fd6e..91179ce532 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -31,7 +31,7 @@ REQUIREMENTS = { "pyyaml": ["yaml"], "pyasn1": ["pyasn1"], "daemonize": ["daemonize"], - "bcrypt": ["bcrypt"], + "bcrypt": ["bcrypt>=3.1.0"], "pillow": ["PIL"], "pydenticon": ["pydenticon"], "ujson": ["ujson"], -- cgit 1.5.1 From 8cb44da4aa569188faa2a94aae6bc093aa8e22ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Mar 2018 12:06:19 +0000 Subject: Fix race in sync when joining room The race happens when the user joins a room at the same time as doing a sync. We fetch the current token and then get the rooms the user is in. If the join happens after the current token, but before we get the rooms we end up sending down a partial room entry in the sync. This is fixed by looking at the stream ordering of the membership returned by get_rooms_for_user, and handling the case when that stream ordering is after the current token. --- synapse/handlers/sync.py | 103 ++++++++++++++++++++++++++++++------------ synapse/storage/events.py | 2 +- synapse/storage/roommember.py | 27 ++++++++++- 3 files changed, 102 insertions(+), 30 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 56b86356f2..163d80417e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -235,10 +235,10 @@ class SyncHandler(object): defer.returnValue(rules) @defer.inlineCallbacks - def ephemeral_by_room(self, sync_config, now_token, since_token=None): + def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None): """Get the ephemeral events for each room the user is in Args: - sync_config (SyncConfig): The flags, filters and user for the sync. + sync_result_builder(SyncResultBuilder) now_token (StreamToken): Where the server is currently up to. since_token (StreamToken): Where the server was when the client last synced. @@ -248,10 +248,12 @@ class SyncHandler(object): typing events for that room. """ + sync_config = sync_result_builder.sync_config + with Measure(self.clock, "ephemeral_by_room"): typing_key = since_token.typing_key if since_token else "0" - room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string()) + room_ids = sync_result_builder.joined_room_ids typing_source = self.event_sources.sources["typing"] typing, typing_key = yield typing_source.get_new_events( @@ -565,10 +567,22 @@ class SyncHandler(object): # Always use the `now_token` in `SyncResultBuilder` now_token = yield self.event_sources.get_current_token() + user_id = sync_config.user.to_string() + app_service = self.store.get_app_service_by_user_id(user_id) + if app_service: + # We no longer support AS users using /sync directly. + # See https://github.com/matrix-org/matrix-doc/issues/1144 + raise NotImplementedError() + else: + joined_room_ids = yield self.get_rooms_for_user_at( + user_id, now_token.room_stream_id, + ) + sync_result_builder = SyncResultBuilder( sync_config, full_state, since_token=since_token, now_token=now_token, + joined_room_ids=joined_room_ids, ) account_data_by_room = yield self._generate_sync_entry_for_account_data( @@ -603,7 +617,6 @@ class SyncHandler(object): device_id = sync_config.device_id one_time_key_counts = {} if device_id: - user_id = sync_config.user.to_string() one_time_key_counts = yield self.store.count_e2e_one_time_keys( user_id, device_id ) @@ -891,7 +904,7 @@ class SyncHandler(object): ephemeral_by_room = {} else: now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_result_builder.sync_config, + sync_result_builder, now_token=sync_result_builder.now_token, since_token=sync_result_builder.since_token, ) @@ -996,16 +1009,8 @@ class SyncHandler(object): if rooms_changed: defer.returnValue(True) - app_service = self.store.get_app_service_by_user_id(user_id) - if app_service: - # We no longer support AS users using /sync directly. - # See https://github.com/matrix-org/matrix-doc/issues/1144 - raise NotImplementedError() - else: - joined_room_ids = yield self.store.get_rooms_for_user(user_id) - stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream - for room_id in joined_room_ids: + for room_id in sync_result_builder.joined_room_ids: if self.store.has_room_changed_since(room_id, stream_id): defer.returnValue(True) defer.returnValue(False) @@ -1029,14 +1034,6 @@ class SyncHandler(object): assert since_token - app_service = self.store.get_app_service_by_user_id(user_id) - if app_service: - # We no longer support AS users using /sync directly. - # See https://github.com/matrix-org/matrix-doc/issues/1144 - raise NotImplementedError() - else: - joined_room_ids = yield self.store.get_rooms_for_user(user_id) - # Get a list of membership change events that have happened. rooms_changed = yield self.store.get_membership_changes_for_user( user_id, since_token.room_key, now_token.room_key @@ -1059,7 +1056,7 @@ class SyncHandler(object): # we do send down the room, and with full state, where necessary old_state_ids = None - if room_id in joined_room_ids and non_joins: + if room_id in sync_result_builder.joined_room_ids and non_joins: # Always include if the user (re)joined the room, especially # important so that device list changes are calculated correctly. # If there are non join member events, but we are still in the room, @@ -1069,7 +1066,7 @@ class SyncHandler(object): # User is in the room so we don't need to do the invite/leave checks continue - if room_id in joined_room_ids or has_join: + if room_id in sync_result_builder.joined_room_ids or has_join: old_state_ids = yield self.get_state_at(room_id, since_token) old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) old_mem_ev = None @@ -1081,7 +1078,7 @@ class SyncHandler(object): newly_joined_rooms.append(room_id) # If user is in the room then we don't need to do the invite/leave checks - if room_id in joined_room_ids: + if room_id in sync_result_builder.joined_room_ids: continue if not non_joins: @@ -1148,7 +1145,7 @@ class SyncHandler(object): # Get all events for rooms we're currently joined to. room_to_events = yield self.store.get_room_events_stream_for_rooms( - room_ids=joined_room_ids, + room_ids=sync_result_builder.joined_room_ids, from_key=since_token.room_key, to_key=now_token.room_key, limit=timeline_limit + 1, @@ -1156,7 +1153,7 @@ class SyncHandler(object): # We loop through all room ids, even if there are no new events, in case # there are non room events taht we need to notify about. - for room_id in joined_room_ids: + for room_id in sync_result_builder.joined_room_ids: room_entry = room_to_events.get(room_id, None) if room_entry: @@ -1364,6 +1361,54 @@ class SyncHandler(object): else: raise Exception("Unrecognized rtype: %r", room_builder.rtype) + @defer.inlineCallbacks + def get_rooms_for_user_at(self, user_id, stream_ordering): + """Get set of joined rooms for a user at the given stream ordering. + + The stream ordering *must* be recent, otherwise this may throw an + exception if older than a month. (This function is called with the + current token, which should be perfectly fine). + + Args: + user_id (str) + stream_ordering (int) + + ReturnValue: + Deferred[frozenset[str]]: Set of room_ids the user is in at given + stream_ordering. + """ + joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering( + user_id, + ) + + joined_room_ids = set() + + # We need to check that the stream ordering of the join for each room + # is before the stream_ordering asked for. This might not be the case + # if the user joins a room between us getting the current token and + # calling `get_rooms_for_user_with_stream_ordering`. + # If the membership's stream ordering is after the given stream + # ordering, we need to go and work out if the user was in the room + # before. + for room_id, membeship_stream_ordering in joined_rooms: + if membeship_stream_ordering <= stream_ordering: + joined_room_ids.add(room_id) + continue + + logger.info("SH joined_room_ids membership after current token") + + extrems = yield self.store.get_forward_extremeties_for_room( + room_id, stream_ordering, + ) + users_in_room = yield self.state.get_current_user_in_room( + room_id, extrems, + ) + if user_id in users_in_room: + joined_room_ids.add(room_id) + + joined_room_ids = frozenset(joined_room_ids) + defer.returnValue(joined_room_ids) + def _action_has_highlight(actions): for action in actions: @@ -1413,7 +1458,8 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): class SyncResultBuilder(object): "Used to help build up a new SyncResult for a user" - def __init__(self, sync_config, full_state, since_token, now_token): + def __init__(self, sync_config, full_state, since_token, now_token, + joined_room_ids): """ Args: sync_config(SyncConfig) @@ -1425,6 +1471,7 @@ class SyncResultBuilder(object): self.full_state = full_state self.since_token = since_token self.now_token = now_token + self.joined_room_ids = joined_room_ids self.presence = [] self.account_data = [] diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 057b1be4d5..826fad307e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -754,7 +754,7 @@ class EventsStore(EventsWorkerStore): for member in members_changed: self._invalidate_cache_and_stream( - txn, self.get_rooms_for_user, (member,) + txn, self.get_rooms_for_user_with_stream_ordering, (member,) ) for host in set(get_domain_from_id(u) for u in members_changed): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index d79877dac7..52e19e16b0 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -38,6 +38,11 @@ RoomsForUser = namedtuple( ("room_id", "sender", "membership", "event_id", "stream_ordering") ) +GetRoomsForUserWithStreamOrdering = namedtuple( + "_GetRoomsForUserWithStreamOrdering", + ("room_id", "stream_ordering",) +) + # We store this using a namedtuple so that we save about 3x space over using a # dict. @@ -181,12 +186,32 @@ class RoomMemberWorkerStore(EventsWorkerStore): return results @cachedInlineCallbacks(max_entries=500000, iterable=True) - def get_rooms_for_user(self, user_id): + def get_rooms_for_user_with_stream_ordering(self, user_id): """Returns a set of room_ids the user is currently joined to + + Args: + user_id (str) + + Returns: + Deferred[frozenset[GetRoomsForUserWithStreamOrdering]]: Returns + the rooms the user is in currently, along with the stream ordering + of the most recent join for that user and room. """ rooms = yield self.get_rooms_for_user_where_membership_is( user_id, membership_list=[Membership.JOIN], ) + defer.returnValue(frozenset( + GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering) + for r in rooms + )) + + @defer.inlineCallbacks + def get_rooms_for_user(self, user_id, on_invalidate=None): + """Returns a set of room_ids the user is currently joined to + """ + rooms = yield self.get_rooms_for_user_with_stream_ordering( + user_id, on_invalidate=on_invalidate, + ) defer.returnValue(frozenset(r.room_id for r in rooms)) @cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True) -- cgit 1.5.1 From 02a1296ad634ff8200abe539d27a53a6f850081d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Mar 2018 13:12:08 +0000 Subject: Fix typo --- synapse/handlers/sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 163d80417e..b323e0e6b0 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1390,8 +1390,8 @@ class SyncHandler(object): # If the membership's stream ordering is after the given stream # ordering, we need to go and work out if the user was in the room # before. - for room_id, membeship_stream_ordering in joined_rooms: - if membeship_stream_ordering <= stream_ordering: + for room_id, membership_stream_ordering in joined_rooms: + if membership_stream_ordering <= stream_ordering: joined_room_ids.add(room_id) continue -- cgit 1.5.1 From a56d54dcb7d3d42fc417d6af82a46d86edf6f73d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Mar 2018 13:29:49 +0000 Subject: Fix up log message --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b323e0e6b0..0f713ce038 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1395,7 +1395,7 @@ class SyncHandler(object): joined_room_ids.add(room_id) continue - logger.info("SH joined_room_ids membership after current token") + logger.info("User joined room after current token: %s", room_id) extrems = yield self.store.get_forward_extremeties_for_room( room_id, stream_ordering, -- cgit 1.5.1 From 20f40348d4ea55cc5b98528673e26bac7396a3cb Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 7 Mar 2018 19:59:24 +0000 Subject: Factor run_in_background out from preserve_fn It annoys me that we create temporary function objects when there's really no need for it. Let's factor the gubbins out of preserve_fn and start using it. --- docs/log_contexts.rst | 8 +++---- synapse/util/logcontext.py | 53 +++++++++++++++++++++++++--------------------- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/docs/log_contexts.rst b/docs/log_contexts.rst index b19b7fa1ea..82ac4f91e5 100644 --- a/docs/log_contexts.rst +++ b/docs/log_contexts.rst @@ -279,9 +279,9 @@ Obviously that option means that the operations done in that might be fixed by setting a different logcontext via a ``with LoggingContext(...)`` in ``background_operation``). -The second option is to use ``logcontext.preserve_fn``, which wraps a function -so that it doesn't reset the logcontext even when it returns an incomplete -deferred, and adds a callback to the returned deferred to reset the +The second option is to use ``logcontext.run_in_background``, which wraps a +function so that it doesn't reset the logcontext even when it returns an +incomplete deferred, and adds a callback to the returned deferred to reset the logcontext. In other words, it turns a function that follows the Synapse rules about logcontexts and Deferreds into one which behaves more like an external function — the opposite operation to that described in the previous section. @@ -293,7 +293,7 @@ It can be used like this: def do_request_handling(): yield foreground_operation() - logcontext.preserve_fn(background_operation)() + logcontext.run_in_background(background_operation) # this will now be logged against the request context logger.debug("Request handling complete") diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index a8dea15c1b..d660ec785b 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -292,36 +292,41 @@ class PreserveLoggingContext(object): def preserve_fn(f): - """Wraps a function, to ensure that the current context is restored after + """Function decorator which wraps the function with run_in_background""" + def g(*args, **kwargs): + return run_in_background(f, *args, **kwargs) + return g + + +def run_in_background(f, *args, **kwargs): + """Calls a function, ensuring that the current context is restored after return from the function, and that the sentinel context is set once the deferred returned by the funtion completes. Useful for wrapping functions that return a deferred which you don't yield on. """ - def g(*args, **kwargs): - current = LoggingContext.current_context() - res = f(*args, **kwargs) - if isinstance(res, defer.Deferred) and not res.called: - # The function will have reset the context before returning, so - # we need to restore it now. - LoggingContext.set_current_context(current) - - # The original context will be restored when the deferred - # completes, but there is nothing waiting for it, so it will - # get leaked into the reactor or some other function which - # wasn't expecting it. We therefore need to reset the context - # here. - # - # (If this feels asymmetric, consider it this way: we are - # effectively forking a new thread of execution. We are - # probably currently within a ``with LoggingContext()`` block, - # which is supposed to have a single entry and exit point. But - # by spawning off another deferred, we are effectively - # adding a new exit point.) - res.addBoth(_set_context_cb, LoggingContext.sentinel) - return res - return g + current = LoggingContext.current_context() + res = f(*args, **kwargs) + if isinstance(res, defer.Deferred) and not res.called: + # The function will have reset the context before returning, so + # we need to restore it now. + LoggingContext.set_current_context(current) + + # The original context will be restored when the deferred + # completes, but there is nothing waiting for it, so it will + # get leaked into the reactor or some other function which + # wasn't expecting it. We therefore need to reset the context + # here. + # + # (If this feels asymmetric, consider it this way: we are + # effectively forking a new thread of execution. We are + # probably currently within a ``with LoggingContext()`` block, + # which is supposed to have a single entry and exit point. But + # by spawning off another deferred, we are effectively + # adding a new exit point.) + res.addBoth(_set_context_cb, LoggingContext.sentinel) + return res def make_deferred_yieldable(deferred): -- cgit 1.5.1 From 1708412f569dc28931a3704d679b41b92ac788b9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 7 Mar 2018 17:32:46 +0000 Subject: Return an error when doing two purges on a room Queuing up purges doesn't sound like a good thing. --- synapse/handlers/message.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index dd00d8a86c..6eb8d19dc9 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -50,15 +50,26 @@ class MessageHandler(BaseHandler): self.clock = hs.get_clock() self.pagination_lock = ReadWriteLock() + self._purges_in_progress_by_room = set() @defer.inlineCallbacks def purge_history(self, room_id, topological_ordering, delete_local_events=False): - with (yield self.pagination_lock.write(room_id)): - yield self.store.purge_history( - room_id, topological_ordering, delete_local_events, + if room_id in self._purges_in_progress_by_room: + raise SynapseError( + 400, + "History purge already in progress for %s" % (room_id, ), ) + self._purges_in_progress_by_room.add(room_id) + try: + with (yield self.pagination_lock.write(room_id)): + yield self.store.purge_history( + room_id, topological_ordering, delete_local_events, + ) + finally: + self._purges_in_progress_by_room.discard(room_id) + @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, as_client_event=True, event_filter=None): -- cgit 1.5.1 From e48c7aac4d827b66182adf80ab9804f42db186c9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 8 Mar 2018 11:47:28 +0000 Subject: Add transactional API to history purge Make the purge request return quickly, and allow scripts to poll for updates. --- docs/admin_api/purge_history_api.rst | 27 +++++++++ synapse/handlers/message.py | 104 +++++++++++++++++++++++++++++++++-- synapse/rest/client/v1/admin.py | 38 ++++++++++++- 3 files changed, 161 insertions(+), 8 deletions(-) diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst index acf1bc5749..ea2922da5c 100644 --- a/docs/admin_api/purge_history_api.rst +++ b/docs/admin_api/purge_history_api.rst @@ -32,3 +32,30 @@ specified by including an event_id in the URI, or by setting a id is given, that event (and others at the same graph depth) will be retained. If ``purge_up_to_ts`` is given, it should be a timestamp since the unix epoch, in milliseconds. + +The API starts the purge running, and returns immediately with a JSON body with +a purge id: + +.. code:: json + + { + "purge_id": "" + } + +Purge status query +------------------ + +It is possible to poll for updates on recent purges with a second API; + +``GET /_matrix/client/r0/admin/purge_history_status/`` + +(again, with a suitable ``access_token``). This API returns a JSON body like +the following: + +.. code:: json + + { + "status": "active" + } + +The status will be one of ``active``, ``complete``, or ``failed``. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6eb8d19dc9..42aab91c50 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -13,7 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, reactor +from twisted.python.failure import Failure from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError @@ -24,9 +25,10 @@ from synapse.types import ( UserID, RoomAlias, RoomStreamToken, ) from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, run_in_background from synapse.util.metrics import measure_func from synapse.util.frozenutils import unfreeze +from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client from synapse.replication.http.send_event import send_event_to_master @@ -41,6 +43,36 @@ import ujson logger = logging.getLogger(__name__) +class PurgeStatus(object): + """Object tracking the status of a purge request + + This class contains information on the progress of a purge request, for + return by get_purge_status. + + Attributes: + status (int): Tracks whether this request has completed. One of + STATUS_{ACTIVE,COMPLETE,FAILED} + """ + + STATUS_ACTIVE = 0 + STATUS_COMPLETE = 1 + STATUS_FAILED = 2 + + STATUS_TEXT = { + STATUS_ACTIVE: "active", + STATUS_COMPLETE: "complete", + STATUS_FAILED: "failed", + } + + def __init__(self): + self.status = PurgeStatus.STATUS_ACTIVE + + def asdict(self): + return { + "status": PurgeStatus.STATUS_TEXT[self.status] + } + + class MessageHandler(BaseHandler): def __init__(self, hs): @@ -51,25 +83,87 @@ class MessageHandler(BaseHandler): self.pagination_lock = ReadWriteLock() self._purges_in_progress_by_room = set() + # map from purge id to PurgeStatus + self._purges_by_id = {} - @defer.inlineCallbacks - def purge_history(self, room_id, topological_ordering, - delete_local_events=False): + def start_purge_history(self, room_id, topological_ordering, + delete_local_events=False): + """Start off a history purge on a room. + + Args: + room_id (str): The room to purge from + + topological_ordering (int): minimum topo ordering to preserve + delete_local_events (bool): True to delete local events as well as + remote ones + + Returns: + str: unique ID for this purge transaction. + """ if room_id in self._purges_in_progress_by_room: raise SynapseError( 400, "History purge already in progress for %s" % (room_id, ), ) + purge_id = random_string(16) + + # we log the purge_id here so that it can be tied back to the + # request id in the log lines. + logger.info("[purge] starting purge_id %s", purge_id) + + self._purges_by_id[purge_id] = PurgeStatus() + run_in_background( + self._purge_history, + purge_id, room_id, topological_ordering, delete_local_events, + ) + return purge_id + + @defer.inlineCallbacks + def _purge_history(self, purge_id, room_id, topological_ordering, + delete_local_events): + """Carry out a history purge on a room. + + Args: + purge_id (str): The id for this purge + room_id (str): The room to purge from + topological_ordering (int): minimum topo ordering to preserve + delete_local_events (bool): True to delete local events as well as + remote ones + + Returns: + Deferred + """ self._purges_in_progress_by_room.add(room_id) try: with (yield self.pagination_lock.write(room_id)): yield self.store.purge_history( room_id, topological_ordering, delete_local_events, ) + logger.info("[purge] complete") + self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE + except Exception: + logger.error("[purge] failed: %s", Failure().getTraceback().rstrip()) + self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED finally: self._purges_in_progress_by_room.discard(room_id) + # remove the purge from the list 24 hours after it completes + def clear_purge(): + del self._purges_by_id[purge_id] + reactor.callLater(24 * 3600, clear_purge) + + def get_purge_status(self, purge_id): + """Get the current status of an active purge + + Args: + purge_id (str): purge_id returned by start_purge_history + + Returns: + PurgeStatus|None + """ + return self._purges_by_id.get(purge_id) + @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, as_client_event=True, event_filter=None): diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index dcf6215dad..303419d281 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import Membership -from synapse.api.errors import AuthError, SynapseError, Codes +from synapse.api.errors import AuthError, SynapseError, Codes, NotFoundError from synapse.types import UserID, create_requester from synapse.http.servlet import parse_json_object_from_request @@ -185,12 +185,43 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): errcode=Codes.BAD_JSON, ) - yield self.handlers.message_handler.purge_history( + purge_id = yield self.handlers.message_handler.start_purge_history( room_id, depth, delete_local_events=delete_local_events, ) - defer.returnValue((200, {})) + defer.returnValue((200, { + "purge_id": purge_id, + })) + + +class PurgeHistoryStatusRestServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns( + "/admin/purge_history_status/(?P[^/]+)" + ) + + def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer) + """ + super(PurgeHistoryStatusRestServlet, self).__init__(hs) + self.handlers = hs.get_handlers() + + @defer.inlineCallbacks + def on_GET(self, request, purge_id): + requester = yield self.auth.get_user_by_req(request) + is_admin = yield self.auth.is_server_admin(requester.user) + + if not is_admin: + raise AuthError(403, "You are not a server admin") + + purge_status = self.handlers.message_handler.get_purge_status(purge_id) + if purge_status is None: + raise NotFoundError("purge id '%s' not found" % purge_id) + + defer.returnValue((200, purge_status.asdict())) class DeactivateAccountRestServlet(ClientV1RestServlet): @@ -561,6 +592,7 @@ class SearchUsersRestServlet(ClientV1RestServlet): def register_servlets(hs, http_server): WhoisRestServlet(hs).register(http_server) PurgeMediaCacheRestServlet(hs).register(http_server) + PurgeHistoryStatusRestServlet(hs).register(http_server) DeactivateAccountRestServlet(hs).register(http_server) PurgeHistoryRestServlet(hs).register(http_server) UsersRestServlet(hs).register(http_server) -- cgit 1.5.1 From 889a2a853a83057d4f218bb828bf686db786d590 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 13 Mar 2018 09:57:54 +0000 Subject: Add Measure block for persist_events This seems like a useful thing to measure. --- synapse/storage/events.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 826fad307e..3890878170 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -283,10 +283,11 @@ class EventsStore(EventsWorkerStore): def _maybe_start_persisting(self, room_id): @defer.inlineCallbacks def persisting_queue(item): - yield self._persist_events( - item.events_and_contexts, - backfilled=item.backfilled, - ) + with Measure(self._clock, "persist_events"): + yield self._persist_events( + item.events_and_contexts, + backfilled=item.backfilled, + ) self._event_persist_queue.handle_queue(room_id, persisting_queue) -- cgit 1.5.1 From c3f79c9da56931453ab86a4c726da5a02f18fe1e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Mar 2018 16:17:08 +0000 Subject: Split out edu/query registration to a separate class --- synapse/federation/federation_server.py | 117 +++++++++++++++++++------------- synapse/handlers/device.py | 6 +- synapse/handlers/devicemessage.py | 2 +- synapse/handlers/directory.py | 2 +- synapse/handlers/e2e_keys.py | 2 +- synapse/handlers/presence.py | 10 +-- synapse/handlers/profile.py | 2 +- synapse/handlers/receipts.py | 2 +- synapse/handlers/typing.py | 2 +- synapse/server.py | 5 ++ 10 files changed, 90 insertions(+), 60 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9849953c9b..5b1914f2f4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -17,7 +17,7 @@ import logging import simplejson as json from twisted.internet import defer -from synapse.api.errors import AuthError, FederationError, SynapseError +from synapse.api.errors import AuthError, FederationError, SynapseError, NotFoundError from synapse.crypto.event_signing import compute_event_signature from synapse.federation.federation_base import ( FederationBase, @@ -56,6 +56,8 @@ class FederationServer(FederationBase): self._server_linearizer = async.Linearizer("fed_server") self._transaction_linearizer = async.Linearizer("fed_txn_handler") + self.registry = hs.get_federation_registry() + # We cache responses to state queries, as they take a while and often # come in waves. self._state_resp_cache = ResponseCache(hs, timeout_ms=30000) @@ -67,35 +69,6 @@ class FederationServer(FederationBase): """ self.handler = handler - def register_edu_handler(self, edu_type, handler): - if edu_type in self.edu_handlers: - raise KeyError("Already have an EDU handler for %s" % (edu_type,)) - - self.edu_handlers[edu_type] = handler - - def register_query_handler(self, query_type, handler): - """Sets the handler callable that will be used to handle an incoming - federation Query of the given type. - - Args: - query_type (str): Category name of the query, which should match - the string used by make_query. - handler (callable): Invoked to handle incoming queries of this type - - handler is invoked as: - result = handler(args) - - where 'args' is a dict mapping strings to strings of the query - arguments. It should return a Deferred that will eventually yield an - object to encode as JSON. - """ - if query_type in self.query_handlers: - raise KeyError( - "Already have a Query handler for %s" % (query_type,) - ) - - self.query_handlers[query_type] = handler - @defer.inlineCallbacks @log_function def on_backfill_request(self, origin, room_id, versions, limit): @@ -229,16 +202,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def received_edu(self, origin, edu_type, content): received_edus_counter.inc() - - if edu_type in self.edu_handlers: - try: - yield self.edu_handlers[edu_type](origin, content) - except SynapseError as e: - logger.info("Failed to handle edu %r: %r", edu_type, e) - except Exception as e: - logger.exception("Failed to handle edu %r", edu_type) - else: - logger.warn("Received EDU of type %s with no handler", edu_type) + yield self.registry.on_edu(edu_type, origin, content) @defer.inlineCallbacks @log_function @@ -328,14 +292,8 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_query_request(self, query_type, args): received_queries_counter.inc(query_type) - - if query_type in self.query_handlers: - response = yield self.query_handlers[query_type](args) - defer.returnValue((200, response)) - else: - defer.returnValue( - (404, "No handler for Query type '%s'" % (query_type,)) - ) + resp = yield self.registry.on_query(query_type, args) + defer.returnValue((200, resp)) @defer.inlineCallbacks def on_make_join_request(self, room_id, user_id): @@ -607,3 +565,66 @@ class FederationServer(FederationBase): origin, room_id, event_dict ) defer.returnValue(ret) + + +class FederationHandlerRegistry(object): + """Allows classes to register themselves as handlers for a given EDU or + query type for incoming federation traffic. + """ + def __init__(self): + self.edu_handlers = {} + self.query_handlers = {} + + def register_edu_handler(self, edu_type, handler): + """Sets the handler callable that will be used to handle an incoming + federation EDU of the given type. + + Args: + edu_type (str): The type of the incoming EDU to register handler for + handler (Callable[str, dict]): A callable invoked on incoming EDU + of the given type. The arguments are the origin server name and + the EDU contents. + """ + if edu_type in self.edu_handlers: + raise KeyError("Already have an EDU handler for %s" % (edu_type,)) + + self.edu_handlers[edu_type] = handler + + def register_query_handler(self, query_type, handler): + """Sets the handler callable that will be used to handle an incoming + federation query of the given type. + + Args: + query_type (str): Category name of the query, which should match + the string used by make_query. + handler (Callable[dict] -> Deferred[dict]): Invoked to handle + incoming queries of this type. The return will be yielded + on and the result used as the response to the query request. + """ + if query_type in self.query_handlers: + raise KeyError( + "Already have a Query handler for %s" % (query_type,) + ) + + self.query_handlers[query_type] = handler + + @defer.inlineCallbacks + def on_edu(self, edu_type, origin, content): + handler = self.edu_handlers.get(edu_type) + if not handler: + logger.warn("No handler registered for EDU type %s", edu_type) + + try: + yield handler(origin, content) + except SynapseError as e: + logger.info("Failed to handle edu %r: %r", edu_type, e) + except Exception as e: + logger.exception("Failed to handle edu %r", edu_type) + + def on_query(self, query_type, args): + handler = self.query_handlers.get(query_type) + if not handler: + logger.warn("No handler registered for query type %s", query_type) + raise NotFoundError("No handler for Query type '%s'" % (query_type,)) + + return handler(args) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 0e83453851..9e58dbe64e 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -41,10 +41,12 @@ class DeviceHandler(BaseHandler): self._edu_updater = DeviceListEduUpdater(hs, self) - self.federation.register_edu_handler( + federation_registry = hs.get_federation_registry() + + federation_registry.register_edu_handler( "m.device_list_update", self._edu_updater.incoming_device_list_update, ) - self.federation.register_query_handler( + federation_registry.register_query_handler( "user_devices", self.on_federation_query_user_devices, ) diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index d996aa90bb..f147a20b73 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -37,7 +37,7 @@ class DeviceMessageHandler(object): self.is_mine = hs.is_mine self.federation = hs.get_federation_sender() - hs.get_replication_layer().register_edu_handler( + hs.get_federation_registry().register_edu_handler( "m.direct_to_device", self.on_direct_to_device_edu ) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 8580ada60a..e955cb1f3c 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -37,7 +37,7 @@ class DirectoryHandler(BaseHandler): self.event_creation_handler = hs.get_event_creation_handler() self.federation = hs.get_replication_layer() - self.federation.register_query_handler( + hs.get_federation_registry().register_query_handler( "directory", self.on_directory_query ) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 9aa95f89e6..57f50a4e27 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -40,7 +40,7 @@ class E2eKeysHandler(object): # doesn't really work as part of the generic query API, because the # query request requires an object POST, but we abuse the # "query handler" interface. - self.federation.register_query_handler( + hs.get_federation_registry().register_query_handler( "client_keys", self.on_federation_query_client_keys ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index cb158ba962..b11ae78350 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -98,24 +98,26 @@ class PresenceHandler(object): self.state = hs.get_state_handler() - self.replication.register_edu_handler( + federation_registry = hs.get_federation_registry() + + federation_registry.register_edu_handler( "m.presence", self.incoming_presence ) - self.replication.register_edu_handler( + federation_registry.register_edu_handler( "m.presence_invite", lambda origin, content: self.invite_presence( observed_user=UserID.from_string(content["observed_user"]), observer_user=UserID.from_string(content["observer_user"]), ) ) - self.replication.register_edu_handler( + federation_registry.register_edu_handler( "m.presence_accept", lambda origin, content: self.accept_presence( observed_user=UserID.from_string(content["observed_user"]), observer_user=UserID.from_string(content["observer_user"]), ) ) - self.replication.register_edu_handler( + federation_registry.register_edu_handler( "m.presence_deny", lambda origin, content: self.deny_presence( observed_user=UserID.from_string(content["observed_user"]), diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index c9c2879038..c386c79bbd 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -32,7 +32,7 @@ class ProfileHandler(BaseHandler): super(ProfileHandler, self).__init__(hs) self.federation = hs.get_replication_layer() - self.federation.register_query_handler( + hs.get_federation_registry().register_query_handler( "profile", self.on_profile_query ) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 0525765272..3f215c2b4e 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -35,7 +35,7 @@ class ReceiptsHandler(BaseHandler): self.store = hs.get_datastore() self.hs = hs self.federation = hs.get_federation_sender() - hs.get_replication_layer().register_edu_handler( + hs.get_federation_registry().register_edu_handler( "m.receipt", self._received_remote_receipt ) self.clock = self.hs.get_clock() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 82dedbbc99..77c0cf146f 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -56,7 +56,7 @@ class TypingHandler(object): self.federation = hs.get_federation_sender() - hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu) + hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu) hs.get_distributor().observe("user_left_room", self.user_left_room) diff --git a/synapse/server.py b/synapse/server.py index 5b6effbe31..1bc8d6f702 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -34,6 +34,7 @@ from synapse.events.builder import EventBuilderFactory from synapse.events.spamcheck import SpamChecker from synapse.federation import initialize_http_replication from synapse.federation.send_queue import FederationRemoteSendQueue +from synapse.federation.federation_server import FederationHandlerRegistry from synapse.federation.transport.client import TransportLayerClient from synapse.federation.transaction_queue import TransactionQueue from synapse.handlers import Handlers @@ -147,6 +148,7 @@ class HomeServer(object): 'groups_attestation_renewer', 'spam_checker', 'room_member_handler', + 'federation_registry', ] def __init__(self, hostname, **kwargs): @@ -387,6 +389,9 @@ class HomeServer(object): def build_room_member_handler(self): return RoomMemberHandler(self) + def build_federation_registry(self): + return FederationHandlerRegistry() + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) -- cgit 1.5.1 From 631a73f7ef7d89c43be47cf01c7c27a1d633052d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 10:39:19 +0000 Subject: Fix tests --- tests/handlers/test_directory.py | 9 ++++----- tests/handlers/test_profile.py | 9 ++++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/tests/handlers/test_directory.py b/tests/handlers/test_directory.py index 5712773909..b103921498 100644 --- a/tests/handlers/test_directory.py +++ b/tests/handlers/test_directory.py @@ -35,21 +35,20 @@ class DirectoryTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - self.mock_federation = Mock(spec=[ - "make_query", - "register_edu_handler", - ]) + self.mock_federation = Mock() + self.mock_registry = Mock() self.query_handlers = {} def register_query_handler(query_type, handler): self.query_handlers[query_type] = handler - self.mock_federation.register_query_handler = register_query_handler + self.mock_registry.register_query_handler = register_query_handler hs = yield setup_test_homeserver( http_client=None, resource_for_federation=Mock(), replication_layer=self.mock_federation, + federation_registry=self.mock_registry, ) hs.handlers = DirectoryHandlers(hs) diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index a5f47181d7..73223ffbd3 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -37,23 +37,22 @@ class ProfileTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - self.mock_federation = Mock(spec=[ - "make_query", - "register_edu_handler", - ]) + self.mock_federation = Mock() + self.mock_registry = Mock() self.query_handlers = {} def register_query_handler(query_type, handler): self.query_handlers[query_type] = handler - self.mock_federation.register_query_handler = register_query_handler + self.mock_registry.register_query_handler = register_query_handler hs = yield setup_test_homeserver( http_client=None, handlers=None, resource_for_federation=Mock(), replication_layer=self.mock_federation, + federation_registry=self.mock_registry, ratelimiter=NonCallableMock(spec_set=[ "send_message", ]) -- cgit 1.5.1 From e05bf34117de19705b36a4803085ea93f7381928 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Mar 2018 14:07:39 +0000 Subject: Move property setting from ReplicationLayer to FederationBase --- synapse/federation/federation_base.py | 6 ++++++ synapse/federation/federation_client.py | 1 + synapse/federation/federation_server.py | 6 ++++++ synapse/federation/replication.py | 22 ---------------------- 4 files changed, 13 insertions(+), 22 deletions(-) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 7918d3e442..79eaa31031 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -27,7 +27,13 @@ logger = logging.getLogger(__name__) class FederationBase(object): def __init__(self, hs): + self.hs = hs + + self.server_name = hs.hostname + self.keyring = hs.get_keyring() self.spam_checker = hs.get_spam_checker() + self.store = hs.get_datastore() + self._clock = hs.get_clock() @defer.inlineCallbacks def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 813907f7f2..38440da5b5 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -58,6 +58,7 @@ class FederationClient(FederationBase): self._clear_tried_cache, 60 * 1000, ) self.state = hs.get_state_handler() + self.transport_layer = hs.get_federation_transport_client() def _clear_tried_cache(self): """Clear pdu_destination_tried cache""" diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5b1914f2f4..dd73fc50b2 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -23,6 +23,8 @@ from synapse.federation.federation_base import ( FederationBase, event_from_pdu_json, ) + +from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction import synapse.metrics from synapse.types import get_domain_from_id @@ -56,6 +58,10 @@ class FederationServer(FederationBase): self._server_linearizer = async.Linearizer("fed_server") self._transaction_linearizer = async.Linearizer("fed_txn_handler") + self.transaction_actions = TransactionActions(self.store) + + self.handler = None + self.registry = hs.get_federation_registry() # We cache responses to state queries, as they take a while and often diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 62d865ec4b..b8b3a3f933 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -20,8 +20,6 @@ a given transport. from .federation_client import FederationClient from .federation_server import FederationServer -from .persistence import TransactionActions - import logging @@ -47,26 +45,6 @@ class ReplicationLayer(FederationClient, FederationServer): """ def __init__(self, hs, transport_layer): - self.server_name = hs.hostname - - self.keyring = hs.get_keyring() - - self.transport_layer = transport_layer - - self.federation_client = self - - self.store = hs.get_datastore() - - self.handler = None - self.edu_handlers = {} - self.query_handlers = {} - - self._clock = hs.get_clock() - - self.transaction_actions = TransactionActions(self.store) - - self.hs = hs - super(ReplicationLayer, self).__init__(hs) def __str__(self): -- cgit 1.5.1 From 265b993b8afd2501b2aa3a50670f39d6d97eddb7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Mar 2018 14:34:31 +0000 Subject: Split replication layer into two --- synapse/app/homeserver.py | 2 +- synapse/federation/federation_server.py | 10 +--------- synapse/federation/transport/server.py | 2 +- synapse/handlers/device.py | 3 +-- synapse/handlers/directory.py | 2 +- synapse/handlers/e2e_keys.py | 2 +- synapse/handlers/federation.py | 4 +--- synapse/handlers/presence.py | 1 - synapse/handlers/profile.py | 2 +- synapse/handlers/room_list.py | 2 +- synapse/handlers/room_member.py | 3 +-- synapse/server.py | 13 +++++++++---- 12 files changed, 19 insertions(+), 27 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e375f2bbcf..503f461ab4 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -348,7 +348,7 @@ def setup(config_options): hs.get_state_handler().start_caching() hs.get_datastore().start_profiling() hs.get_datastore().start_doing_background_updates() - hs.get_replication_layer().start_get_pdu_cache() + hs.get_replication_client().start_get_pdu_cache() register_memory_metrics(hs) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index dd73fc50b2..740ef96280 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -54,27 +54,19 @@ class FederationServer(FederationBase): super(FederationServer, self).__init__(hs) self.auth = hs.get_auth() + self.handler = hs.get_handlers().federation_handler self._server_linearizer = async.Linearizer("fed_server") self._transaction_linearizer = async.Linearizer("fed_txn_handler") self.transaction_actions = TransactionActions(self.store) - self.handler = None - self.registry = hs.get_federation_registry() # We cache responses to state queries, as they take a while and often # come in waves. self._state_resp_cache = ResponseCache(hs, timeout_ms=30000) - def set_handler(self, handler): - """Sets the handler that the replication layer will use to communicate - receipt of new PDUs from other home servers. The required methods are - documented on :py:class:`.ReplicationHandler`. - """ - self.handler = handler - @defer.inlineCallbacks @log_function def on_backfill_request(self, origin, room_id, versions, limit): diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 06c16ba4fa..04b83e691a 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1190,7 +1190,7 @@ GROUP_ATTESTATION_SERVLET_CLASSES = ( def register_servlets(hs, resource, authenticator, ratelimiter): for servletclass in FEDERATION_SERVLET_CLASSES: servletclass( - handler=hs.get_replication_layer(), + handler=hs.get_replication_server(), authenticator=authenticator, ratelimiter=ratelimiter, server_name=hs.hostname, diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9e58dbe64e..fcf41630d6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -37,7 +37,6 @@ class DeviceHandler(BaseHandler): self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() self.federation_sender = hs.get_federation_sender() - self.federation = hs.get_replication_layer() self._edu_updater = DeviceListEduUpdater(hs, self) @@ -432,7 +431,7 @@ class DeviceListEduUpdater(object): def __init__(self, hs, device_handler): self.store = hs.get_datastore() - self.federation = hs.get_replication_layer() + self.federation = hs.get_replication_client() self.clock = hs.get_clock() self.device_handler = device_handler diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index e955cb1f3c..dfe04eb1c1 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -36,7 +36,7 @@ class DirectoryHandler(BaseHandler): self.appservice_handler = hs.get_application_service_handler() self.event_creation_handler = hs.get_event_creation_handler() - self.federation = hs.get_replication_layer() + self.federation = hs.get_replication_client() hs.get_federation_registry().register_query_handler( "directory", self.on_directory_query ) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 57f50a4e27..0ca8d036ee 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -32,7 +32,7 @@ logger = logging.getLogger(__name__) class E2eKeysHandler(object): def __init__(self, hs): self.store = hs.get_datastore() - self.federation = hs.get_replication_layer() + self.federation = hs.get_replication_client() self.device_handler = hs.get_device_handler() self.is_mine = hs.is_mine self.clock = hs.get_clock() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 520612683e..cfd4379160 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -68,7 +68,7 @@ class FederationHandler(BaseHandler): self.hs = hs self.store = hs.get_datastore() - self.replication_layer = hs.get_replication_layer() + self.replication_layer = hs.get_replication_client() self.state_handler = hs.get_state_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() @@ -78,8 +78,6 @@ class FederationHandler(BaseHandler): self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() - self.replication_layer.set_handler(self) - # When joining a room we need to queue any events for that room up self.room_queues = {} self._room_pdu_linearizer = Linearizer("fed_room_pdu") diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b11ae78350..a5e501897c 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -93,7 +93,6 @@ class PresenceHandler(object): self.store = hs.get_datastore() self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() - self.replication = hs.get_replication_layer() self.federation = hs.get_federation_sender() self.state = hs.get_state_handler() diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index c386c79bbd..0cfac60d74 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -31,7 +31,7 @@ class ProfileHandler(BaseHandler): def __init__(self, hs): super(ProfileHandler, self).__init__(hs) - self.federation = hs.get_replication_layer() + self.federation = hs.get_replication_client() hs.get_federation_registry().register_query_handler( "profile", self.on_profile_query ) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index dfa09141ed..f79bd8902f 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -409,7 +409,7 @@ class RoomListHandler(BaseHandler): def _get_remote_list_cached(self, server_name, limit=None, since_token=None, search_filter=None, include_all_networks=False, third_party_instance_id=None,): - repl_layer = self.hs.get_replication_layer() + repl_layer = self.hs.get_replication_client() if search_filter: # We can't cache when asking for search return repl_layer.get_public_rooms( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ed3b97730d..e2f0527712 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -55,7 +55,6 @@ class RoomMemberHandler(object): self.registration_handler = hs.get_handlers().registration_handler self.profile_handler = hs.get_profile_handler() self.event_creation_hander = hs.get_event_creation_handler() - self.replication_layer = hs.get_replication_layer() self.member_linearizer = Linearizer(name="member") @@ -212,7 +211,7 @@ class RoomMemberHandler(object): # if this is a join with a 3pid signature, we may need to turn a 3pid # invite into a normal invite before we can handle the join. if third_party_signed is not None: - yield self.replication_layer.exchange_third_party_invite( + yield self.federation_handler.exchange_third_party_invite( third_party_signed["sender"], target.to_string(), room_id, diff --git a/synapse/server.py b/synapse/server.py index 1bc8d6f702..894e9c2acf 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -32,7 +32,8 @@ from synapse.appservice.scheduler import ApplicationServiceScheduler from synapse.crypto.keyring import Keyring from synapse.events.builder import EventBuilderFactory from synapse.events.spamcheck import SpamChecker -from synapse.federation import initialize_http_replication +from synapse.federation.federation_client import FederationClient +from synapse.federation.federation_server import FederationServer from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.federation_server import FederationHandlerRegistry from synapse.federation.transport.client import TransportLayerClient @@ -100,7 +101,8 @@ class HomeServer(object): DEPENDENCIES = [ 'http_client', 'db_pool', - 'replication_layer', + 'replication_client', + 'replication_server', 'handlers', 'v1auth', 'auth', @@ -197,8 +199,11 @@ class HomeServer(object): def get_ratelimiter(self): return self.ratelimiter - def build_replication_layer(self): - return initialize_http_replication(self) + def build_replication_client(self): + return FederationClient(self) + + def build_replication_server(self): + return FederationServer(self) def build_handlers(self): return Handlers(self) -- cgit 1.5.1 From 6ea27fafad7c290b8f082fedfa8ff7948cf9f1fd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 10:15:49 +0000 Subject: Fix tests --- tests/handlers/test_directory.py | 2 +- tests/handlers/test_e2e_keys.py | 2 +- tests/handlers/test_profile.py | 3 ++- tests/handlers/test_typing.py | 2 +- tests/replication/slave/storage/_base.py | 2 +- tests/rest/client/v1/test_events.py | 2 +- tests/rest/client/v1/test_profile.py | 2 +- tests/rest/client/v1/test_rooms.py | 16 ++++++++-------- tests/rest/client/v1/test_typing.py | 2 +- tests/storage/test_appservice.py | 10 +++++----- 10 files changed, 22 insertions(+), 21 deletions(-) diff --git a/tests/handlers/test_directory.py b/tests/handlers/test_directory.py index b103921498..b4f36b27a6 100644 --- a/tests/handlers/test_directory.py +++ b/tests/handlers/test_directory.py @@ -47,7 +47,7 @@ class DirectoryTestCase(unittest.TestCase): hs = yield setup_test_homeserver( http_client=None, resource_for_federation=Mock(), - replication_layer=self.mock_federation, + replication_client=self.mock_federation, federation_registry=self.mock_registry, ) hs.handlers = DirectoryHandlers(hs) diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index d92bf240b1..fe73f2b96c 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -34,7 +34,7 @@ class E2eKeysHandlerTestCase(unittest.TestCase): def setUp(self): self.hs = yield utils.setup_test_homeserver( handlers=None, - replication_layer=mock.Mock(), + replication_client=mock.Mock(), ) self.handler = synapse.handlers.e2e_keys.E2eKeysHandler(self.hs) diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 73223ffbd3..c690437682 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -51,7 +51,8 @@ class ProfileTestCase(unittest.TestCase): http_client=None, handlers=None, resource_for_federation=Mock(), - replication_layer=self.mock_federation, + replication_client=self.mock_federation, + replication_server=Mock(), federation_registry=self.mock_registry, ratelimiter=NonCallableMock(spec_set=[ "send_message", diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index fcd380b03a..a433bbfa8a 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -81,7 +81,7 @@ class TypingNotificationsTestCase(unittest.TestCase): "get_current_state_deltas", ]), state_handler=self.state_handler, - handlers=None, + handlers=Mock(), notifier=mock_notifier, resource_for_client=Mock(), resource_for_federation=self.mock_federation_resource, diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 74f104e3b8..ceffdaad54 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -31,7 +31,7 @@ class BaseSlavedStoreTestCase(unittest.TestCase): self.hs = yield setup_test_homeserver( "blue", http_client=None, - replication_layer=Mock(), + replication_client=Mock(), ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), diff --git a/tests/rest/client/v1/test_events.py b/tests/rest/client/v1/test_events.py index e9698bfdc9..f04bf7dfde 100644 --- a/tests/rest/client/v1/test_events.py +++ b/tests/rest/client/v1/test_events.py @@ -114,7 +114,7 @@ class EventStreamPermissionsTestCase(RestTestCase): hs = yield setup_test_homeserver( http_client=None, - replication_layer=Mock(), + replication_client=Mock(), ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), diff --git a/tests/rest/client/v1/test_profile.py b/tests/rest/client/v1/test_profile.py index dddcf51b69..feddcf024e 100644 --- a/tests/rest/client/v1/test_profile.py +++ b/tests/rest/client/v1/test_profile.py @@ -45,7 +45,7 @@ class ProfileTestCase(unittest.TestCase): http_client=None, resource_for_client=self.mock_resource, federation=Mock(), - replication_layer=Mock(), + replication_client=Mock(), profile_handler=self.mock_handler ) diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 9f37255381..2c0708b0d8 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -46,7 +46,7 @@ class RoomPermissionsTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_layer=Mock(), + replication_client=Mock(), ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.ratelimiter = hs.get_ratelimiter() @@ -409,7 +409,7 @@ class RoomsMemberListTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_layer=Mock(), + replication_client=Mock(), ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.ratelimiter = hs.get_ratelimiter() @@ -493,7 +493,7 @@ class RoomsCreateTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_layer=Mock(), + replication_client=Mock(), ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.ratelimiter = hs.get_ratelimiter() @@ -582,7 +582,7 @@ class RoomTopicTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_layer=Mock(), + replication_client=Mock(), ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.ratelimiter = hs.get_ratelimiter() @@ -697,7 +697,7 @@ class RoomMemberStateTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_layer=Mock(), + replication_client=Mock(), ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.ratelimiter = hs.get_ratelimiter() @@ -829,7 +829,7 @@ class RoomMessagesTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_layer=Mock(), + replication_client=Mock(), ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.ratelimiter = hs.get_ratelimiter() @@ -929,7 +929,7 @@ class RoomInitialSyncTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_layer=Mock(), + replication_client=Mock(), ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), @@ -1003,7 +1003,7 @@ class RoomMessageListTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_layer=Mock(), + replication_client=Mock(), ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.ratelimiter = hs.get_ratelimiter() diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py index e46534cd35..62639e3adc 100644 --- a/tests/rest/client/v1/test_typing.py +++ b/tests/rest/client/v1/test_typing.py @@ -47,7 +47,7 @@ class RoomTypingTestCase(RestTestCase): "red", clock=self.clock, http_client=None, - replication_layer=Mock(), + replication_client=Mock(), ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 13d81f972b..cc0df7f662 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -42,7 +42,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): hs = yield setup_test_homeserver( config=config, federation_sender=Mock(), - replication_layer=Mock(), + replication_client=Mock(), ) self.as_token = "token1" @@ -119,7 +119,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): hs = yield setup_test_homeserver( config=config, federation_sender=Mock(), - replication_layer=Mock(), + replication_client=Mock(), ) self.db_pool = hs.get_db_pool() @@ -455,7 +455,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): config=config, datastore=Mock(), federation_sender=Mock(), - replication_layer=Mock(), + replication_client=Mock(), ) ApplicationServiceStore(None, hs) @@ -473,7 +473,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): config=config, datastore=Mock(), federation_sender=Mock(), - replication_layer=Mock(), + replication_client=Mock(), ) with self.assertRaises(ConfigError) as cm: @@ -497,7 +497,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): config=config, datastore=Mock(), federation_sender=Mock(), - replication_layer=Mock(), + replication_client=Mock(), ) with self.assertRaises(ConfigError) as cm: -- cgit 1.5.1 From ea7b3c4b1b829703a780418e6bb6b7860a5b5451 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 11:00:04 +0000 Subject: Remove unused ReplicationLayer --- synapse/federation/__init__.py | 8 ------ synapse/federation/replication.py | 51 --------------------------------------- 2 files changed, 59 deletions(-) delete mode 100644 synapse/federation/replication.py diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py index 2e32d245ba..f5f0bdfca3 100644 --- a/synapse/federation/__init__.py +++ b/synapse/federation/__init__.py @@ -15,11 +15,3 @@ """ This package includes all the federation specific logic. """ - -from .replication import ReplicationLayer - - -def initialize_http_replication(hs): - transport = hs.get_federation_transport_client() - - return ReplicationLayer(hs, transport) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py deleted file mode 100644 index b8b3a3f933..0000000000 --- a/synapse/federation/replication.py +++ /dev/null @@ -1,51 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This layer is responsible for replicating with remote home servers using -a given transport. -""" - -from .federation_client import FederationClient -from .federation_server import FederationServer - -import logging - - -logger = logging.getLogger(__name__) - - -class ReplicationLayer(FederationClient, FederationServer): - """This layer is responsible for replicating with remote home servers over - the given transport. I.e., does the sending and receiving of PDUs to - remote home servers. - - The layer communicates with the rest of the server via a registered - ReplicationHandler. - - In more detail, the layer: - * Receives incoming data and processes it into transactions and pdus. - * Fetches any PDUs it thinks it might have missed. - * Keeps the current state for contexts up to date by applying the - suitable conflict resolution. - * Sends outgoing pdus wrapped in transactions. - * Fills out the references to previous pdus/transactions appropriately - for outgoing data. - """ - - def __init__(self, hs, transport_layer): - super(ReplicationLayer, self).__init__(hs) - - def __str__(self): - return "" % self.server_name -- cgit 1.5.1 From d023ecb8109718cd95c4dd86e916a459fc610547 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 11:08:10 +0000 Subject: Don't build handlers on workers unnecessarily --- synapse/app/client_reader.py | 1 - synapse/app/event_creator.py | 1 - synapse/app/federation_reader.py | 1 - synapse/app/frontend_proxy.py | 1 - synapse/app/media_repository.py | 1 - 5 files changed, 5 deletions(-) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 3b3352798d..0a8ce9bc66 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -156,7 +156,6 @@ def start(config_options): ) ss.setup() - ss.get_handlers() ss.start_listening(config.worker_listeners) def start(): diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index fc0b9e8c04..eb593c5278 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -161,7 +161,6 @@ def start(config_options): ) ss.setup() - ss.get_handlers() ss.start_listening(config.worker_listeners) def start(): diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 4de43c41f0..20d157911b 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -144,7 +144,6 @@ def start(config_options): ) ss.setup() - ss.get_handlers() ss.start_listening(config.worker_listeners) def start(): diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index e32ee8fe93..816c080d18 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -211,7 +211,6 @@ def start(config_options): ) ss.setup() - ss.get_handlers() ss.start_listening(config.worker_listeners) def start(): diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 1ed1ca8772..84c5791b3b 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -158,7 +158,6 @@ def start(config_options): ) ss.setup() - ss.get_handlers() ss.start_listening(config.worker_listeners) def start(): -- cgit 1.5.1 From 31becf4ac3a1c8c675ecab481b07cffb9aa24fd8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 10:28:52 +0000 Subject: Make functions private that can be --- synapse/handlers/room_member.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ed3b97730d..2a6b7e9f8c 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -138,7 +138,7 @@ class RoomMemberHandler(object): defer.returnValue(event) @defer.inlineCallbacks - def remote_join(self, remote_room_hosts, room_id, user, content): + def _remote_join(self, remote_room_hosts, room_id, user, content): if len(remote_room_hosts) == 0: raise SynapseError(404, "No known servers") @@ -292,7 +292,7 @@ class RoomMemberHandler(object): raise AuthError(403, "Guest access not allowed") if not is_host_in_room: - inviter = yield self.get_inviter(target.to_string(), room_id) + inviter = yield self._get_inviter(target.to_string(), room_id) if inviter and not self.hs.is_mine(inviter): remote_room_hosts.append(inviter.domain) @@ -306,7 +306,7 @@ class RoomMemberHandler(object): if requester.is_guest: content["kind"] = "guest" - ret = yield self.remote_join( + ret = yield self._remote_join( remote_room_hosts, room_id, target, content ) defer.returnValue(ret) @@ -314,7 +314,7 @@ class RoomMemberHandler(object): elif effective_membership_state == Membership.LEAVE: if not is_host_in_room: # perhaps we've been invited - inviter = yield self.get_inviter(target.to_string(), room_id) + inviter = yield self._get_inviter(target.to_string(), room_id) if not inviter: raise SynapseError(404, "Not a known room") @@ -496,7 +496,7 @@ class RoomMemberHandler(object): defer.returnValue((RoomID.from_string(room_id), servers)) @defer.inlineCallbacks - def get_inviter(self, user_id, room_id): + def _get_inviter(self, user_id, room_id): invite = yield self.store.get_invite_for_user_in_room( user_id=user_id, room_id=room_id, @@ -573,7 +573,7 @@ class RoomMemberHandler(object): if "mxid" in data: if "signatures" not in data: raise AuthError(401, "No signatures on 3pid binding") - yield self.verify_any_signature(data, id_server) + yield self._verify_any_signature(data, id_server) defer.returnValue(data["mxid"]) except IOError as e: @@ -581,7 +581,7 @@ class RoomMemberHandler(object): defer.returnValue(None) @defer.inlineCallbacks - def verify_any_signature(self, data, server_hostname): + def _verify_any_signature(self, data, server_hostname): if server_hostname not in data["signatures"]: raise AuthError(401, "No signature from server %s" % (server_hostname,)) for key_name, signature in data["signatures"][server_hostname].items(): -- cgit 1.5.1 From d0fcc48f9dfc09531619faf23d407807eec46df9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 17:39:58 +0000 Subject: extra_users is actually a list of UserIDs --- synapse/handlers/message.py | 2 +- synapse/replication/http/send_event.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 42aab91c50..4f97c8db79 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -667,7 +667,7 @@ class EventCreationHandler(object): event (FrozenEvent) context (EventContext) ratelimit (bool) - extra_users (list(str)): Any extra users to notify about event + extra_users (list(UserID)): Any extra users to notify about event """ try: diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 70f2fe456a..bbe2f967b7 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -25,7 +25,7 @@ from synapse.util.async import sleep from synapse.util.caches.response_cache import ResponseCache from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.metrics import Measure -from synapse.types import Requester +from synapse.types import Requester, UserID import logging import re @@ -46,7 +46,7 @@ def send_event_to_master(client, host, port, requester, event, context, event (FrozenEvent) context (EventContext) ratelimit (bool) - extra_users (list(str)): Any extra users to notify about event + extra_users (list(UserID)): Any extra users to notify about event """ uri = "http://%s:%s/_synapse/replication/send_event/%s" % ( host, port, event.event_id, @@ -59,7 +59,7 @@ def send_event_to_master(client, host, port, requester, event, context, "context": context.serialize(event), "requester": requester.serialize(), "ratelimit": ratelimit, - "extra_users": extra_users, + "extra_users": [u.to_string() for u in extra_users], } try: @@ -143,7 +143,7 @@ class ReplicationSendEventRestServlet(RestServlet): context = yield EventContext.deserialize(self.store, content["context"]) ratelimit = content["ratelimit"] - extra_users = content["extra_users"] + extra_users = [UserID.from_string(u) for u in content["extra_users"]] if requester.user: request.authenticated_entity = requester.user.to_string() -- cgit 1.5.1 From 0f942f68c106b9d0fb89d0eaef9fa942b6d003ab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 11:31:11 +0000 Subject: Factor out _remote_reject_invite in RoomMember --- synapse/handlers/room_member.py | 50 +++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ed3b97730d..6c8acfbf09 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -154,6 +154,30 @@ class RoomMemberHandler(object): ) yield user_joined_room(self.distributor, user, room_id) + @defer.inlineCallbacks + def _remote_reject_invite(self, remote_room_hosts, room_id, target): + fed_handler = self.federation_handler + try: + ret = yield fed_handler.do_remotely_reject_invite( + remote_room_hosts, + room_id, + target.to_string(), + ) + defer.returnValue(ret) + except Exception as e: + # if we were unable to reject the exception, just mark + # it as rejected on our end and plough ahead. + # + # The 'except' clause is very broad, but we need to + # capture everything from DNS failures upwards + # + logger.warn("Failed to reject invite: %s", e) + + yield self.store.locally_reject_invite( + target.to_string(), room_id + ) + defer.returnValue({}) + @defer.inlineCallbacks def update_membership( self, @@ -328,28 +352,10 @@ class RoomMemberHandler(object): else: # send the rejection to the inviter's HS. remote_room_hosts = remote_room_hosts + [inviter.domain] - fed_handler = self.federation_handler - try: - ret = yield fed_handler.do_remotely_reject_invite( - remote_room_hosts, - room_id, - target.to_string(), - ) - defer.returnValue(ret) - except Exception as e: - # if we were unable to reject the exception, just mark - # it as rejected on our end and plough ahead. - # - # The 'except' clause is very broad, but we need to - # capture everything from DNS failures upwards - # - logger.warn("Failed to reject invite: %s", e) - - yield self.store.locally_reject_invite( - target.to_string(), room_id - ) - - defer.returnValue({}) + res = yield self._remote_reject_invite( + remote_room_hosts, room_id, target, + ) + defer.returnValue(res) res = yield self._local_membership_update( requester=requester, -- cgit 1.5.1 From f43b6d6d9b7e6f6a94ba3e1886cec6ca864fad43 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 11:29:17 +0000 Subject: Fix docstring types --- synapse/federation/federation_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5b1914f2f4..90302c9537 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -581,7 +581,7 @@ class FederationHandlerRegistry(object): Args: edu_type (str): The type of the incoming EDU to register handler for - handler (Callable[str, dict]): A callable invoked on incoming EDU + handler (Callable[[str, dict]]): A callable invoked on incoming EDU of the given type. The arguments are the origin server name and the EDU contents. """ @@ -597,7 +597,7 @@ class FederationHandlerRegistry(object): Args: query_type (str): Category name of the query, which should match the string used by make_query. - handler (Callable[dict] -> Deferred[dict]): Invoked to handle + handler (Callable[[dict], Deferred[dict]]): Invoked to handle incoming queries of this type. The return will be yielded on and the result used as the response to the query request. """ -- cgit 1.5.1 From 8b3573a8b209c60b03d5ef7f4dfed9ccb9e9f7b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 12:04:38 +0000 Subject: Refactor get_or_register_3pid_guest --- synapse/handlers/register.py | 26 ++++++++++++++++++++++---- synapse/handlers/room_member.py | 10 +++------- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 9021d4d57f..ed5939880a 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -446,16 +446,34 @@ class RegistrationHandler(BaseHandler): return self.hs.get_auth_handler() @defer.inlineCallbacks - def guest_access_token_for(self, medium, address, inviter_user_id): + def get_or_register_3pid_guest(self, medium, address, inviter_user_id): + """Get a guest access token for a 3PID, creating a guest account if + one doesn't already exist. + + Args: + medium (str) + address (str) + inviter_user_id (str): The user ID who is trying to invite the + 3PID + + Returns: + Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the + 3PID guest account. + """ access_token = yield self.store.get_3pid_guest_access_token(medium, address) if access_token: - defer.returnValue(access_token) + user_info = yield self.auth.get_user_by_access_token( + access_token + ) - _, access_token = yield self.register( + defer.returnValue((user_info["user"].to_string(), access_token)) + + user_id, access_token = yield self.register( generate_token=True, make_guest=True ) access_token = yield self.store.save_or_get_3pid_guest_access_token( medium, address, access_token, inviter_user_id ) - defer.returnValue(access_token) + + defer.returnValue((user_id, access_token)) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ed3b97730d..c3c720536e 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -735,20 +735,16 @@ class RoomMemberHandler(object): } if self.config.invite_3pid_guest: - registration_handler = self.registration_handler - guest_access_token = yield registration_handler.guest_access_token_for( + rh = self.registration_handler + guest_user_id, guest_access_token = yield rh.get_or_register_3pid_guest( medium=medium, address=address, inviter_user_id=inviter_user_id, ) - guest_user_info = yield self.auth.get_user_by_access_token( - guest_access_token - ) - invite_config.update({ "guest_access_token": guest_access_token, - "guest_user_id": guest_user_info["user"].to_string(), + "guest_user_id": guest_user_id, }) data = yield self.simple_http_client.post_urlencoded_get_json( -- cgit 1.5.1 From f5160d4a3e559ba23f3e6002a8f9172dff4b3d60 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 12:12:55 +0000 Subject: RoomMembershipRestServlet doesn't handle /forget Due to the order we register the REST handlers `/forget` was handled by the correct handler. --- synapse/rest/client/v1/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 9d745174c7..f8999d64d7 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -599,7 +599,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet): def register(self, http_server): # /rooms/$roomid/[invite|join|leave] PATTERNS = ("/rooms/(?P[^/]*)/" - "(?Pjoin|invite|leave|ban|unban|kick|forget)") + "(?Pjoin|invite|leave|ban|unban|kick)") register_txn_path(self, PATTERNS, http_server) @defer.inlineCallbacks -- cgit 1.5.1 From ea3442c15c32ba98c407c71722cb80821d99d160 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 13:16:21 +0000 Subject: Add docstring --- synapse/handlers/room_member.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 6c8acfbf09..da35e604d0 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -139,6 +139,19 @@ class RoomMemberHandler(object): @defer.inlineCallbacks def remote_join(self, remote_room_hosts, room_id, user, content): + """Try and join a room that this server is not in + + Args: + remote_room_hosts (list[str]): List of servers that can be used + to join via. + room_id (str): Room that we are trying to join + user (UserID): User who is trying to join + content (dict): A dict that should be used as the content of the + join event. + + Returns: + Deferred + """ if len(remote_room_hosts) == 0: raise SynapseError(404, "No known servers") @@ -156,6 +169,19 @@ class RoomMemberHandler(object): @defer.inlineCallbacks def _remote_reject_invite(self, remote_room_hosts, room_id, target): + """Attempt to reject an invite for a room this server is not in. If we + fail to do so we locally mark the invite as rejected. + + Args: + remote_room_hosts (list[str]): List of servers to use to try and + reject invite + room_id (str) + target (UserID): The user rejecting the invite + + Returns: + Deferred[dict]: A dictionary to be returned to the client, may + include event_id etc, or nothing if we locally rejected + """ fed_handler = self.federation_handler try: ret = yield fed_handler.do_remotely_reject_invite( -- cgit 1.5.1 From cea462e2857d3af2007521f131adb46e4f5ce6fe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 13:22:21 +0000 Subject: s/replication_server/federation_server --- synapse/federation/transport/server.py | 2 +- synapse/server.py | 4 ++-- tests/handlers/test_profile.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 04b83e691a..a66a6b0692 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1190,7 +1190,7 @@ GROUP_ATTESTATION_SERVLET_CLASSES = ( def register_servlets(hs, resource, authenticator, ratelimiter): for servletclass in FEDERATION_SERVLET_CLASSES: servletclass( - handler=hs.get_replication_server(), + handler=hs.get_federation_server(), authenticator=authenticator, ratelimiter=ratelimiter, server_name=hs.hostname, diff --git a/synapse/server.py b/synapse/server.py index 894e9c2acf..802a793848 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -102,7 +102,7 @@ class HomeServer(object): 'http_client', 'db_pool', 'replication_client', - 'replication_server', + 'federation_server', 'handlers', 'v1auth', 'auth', @@ -202,7 +202,7 @@ class HomeServer(object): def build_replication_client(self): return FederationClient(self) - def build_replication_server(self): + def build_federation_server(self): return FederationServer(self) def build_handlers(self): diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index c690437682..f9f828471a 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -52,7 +52,7 @@ class ProfileTestCase(unittest.TestCase): handlers=None, resource_for_federation=Mock(), replication_client=self.mock_federation, - replication_server=Mock(), + federation_server=Mock(), federation_registry=self.mock_registry, ratelimiter=NonCallableMock(spec_set=[ "send_message", -- cgit 1.5.1 From cb9f8e527c09315eea05955ec970154ea2fb9729 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 13:26:52 +0000 Subject: s/replication_client/federation_client/ --- synapse/app/homeserver.py | 2 +- synapse/handlers/device.py | 2 +- synapse/handlers/directory.py | 2 +- synapse/handlers/e2e_keys.py | 2 +- synapse/handlers/federation.py | 2 +- synapse/handlers/profile.py | 2 +- synapse/handlers/room_list.py | 2 +- synapse/server.py | 4 ++-- tests/handlers/test_directory.py | 2 +- tests/handlers/test_e2e_keys.py | 2 +- tests/handlers/test_profile.py | 2 +- tests/replication/slave/storage/_base.py | 2 +- tests/rest/client/v1/test_events.py | 2 +- tests/rest/client/v1/test_profile.py | 2 +- tests/rest/client/v1/test_rooms.py | 16 ++++++++-------- tests/rest/client/v1/test_typing.py | 2 +- tests/storage/test_appservice.py | 10 +++++----- 17 files changed, 29 insertions(+), 29 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 503f461ab4..e477c7ced6 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -348,7 +348,7 @@ def setup(config_options): hs.get_state_handler().start_caching() hs.get_datastore().start_profiling() hs.get_datastore().start_doing_background_updates() - hs.get_replication_client().start_get_pdu_cache() + hs.get_federation_client().start_get_pdu_cache() register_memory_metrics(hs) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index fcf41630d6..40f3d24678 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -431,7 +431,7 @@ class DeviceListEduUpdater(object): def __init__(self, hs, device_handler): self.store = hs.get_datastore() - self.federation = hs.get_replication_client() + self.federation = hs.get_federation_client() self.clock = hs.get_clock() self.device_handler = device_handler diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index dfe04eb1c1..c5b6e75e03 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -36,7 +36,7 @@ class DirectoryHandler(BaseHandler): self.appservice_handler = hs.get_application_service_handler() self.event_creation_handler = hs.get_event_creation_handler() - self.federation = hs.get_replication_client() + self.federation = hs.get_federation_client() hs.get_federation_registry().register_query_handler( "directory", self.on_directory_query ) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 0ca8d036ee..31b1ece13e 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -32,7 +32,7 @@ logger = logging.getLogger(__name__) class E2eKeysHandler(object): def __init__(self, hs): self.store = hs.get_datastore() - self.federation = hs.get_replication_client() + self.federation = hs.get_federation_client() self.device_handler = hs.get_device_handler() self.is_mine = hs.is_mine self.clock = hs.get_clock() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cfd4379160..080aca3d71 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -68,7 +68,7 @@ class FederationHandler(BaseHandler): self.hs = hs self.store = hs.get_datastore() - self.replication_layer = hs.get_replication_client() + self.replication_layer = hs.get_federation_client() self.state_handler = hs.get_state_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 0cfac60d74..cb710fe796 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -31,7 +31,7 @@ class ProfileHandler(BaseHandler): def __init__(self, hs): super(ProfileHandler, self).__init__(hs) - self.federation = hs.get_replication_client() + self.federation = hs.get_federation_client() hs.get_federation_registry().register_query_handler( "profile", self.on_profile_query ) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index f79bd8902f..5d81f59b44 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -409,7 +409,7 @@ class RoomListHandler(BaseHandler): def _get_remote_list_cached(self, server_name, limit=None, since_token=None, search_filter=None, include_all_networks=False, third_party_instance_id=None,): - repl_layer = self.hs.get_replication_client() + repl_layer = self.hs.get_federation_client() if search_filter: # We can't cache when asking for search return repl_layer.get_public_rooms( diff --git a/synapse/server.py b/synapse/server.py index 802a793848..43c6e0a6d6 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -101,7 +101,7 @@ class HomeServer(object): DEPENDENCIES = [ 'http_client', 'db_pool', - 'replication_client', + 'federation_client', 'federation_server', 'handlers', 'v1auth', @@ -199,7 +199,7 @@ class HomeServer(object): def get_ratelimiter(self): return self.ratelimiter - def build_replication_client(self): + def build_federation_client(self): return FederationClient(self) def build_federation_server(self): diff --git a/tests/handlers/test_directory.py b/tests/handlers/test_directory.py index b4f36b27a6..7e5332e272 100644 --- a/tests/handlers/test_directory.py +++ b/tests/handlers/test_directory.py @@ -47,7 +47,7 @@ class DirectoryTestCase(unittest.TestCase): hs = yield setup_test_homeserver( http_client=None, resource_for_federation=Mock(), - replication_client=self.mock_federation, + federation_client=self.mock_federation, federation_registry=self.mock_registry, ) hs.handlers = DirectoryHandlers(hs) diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index fe73f2b96c..d1bd87b898 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -34,7 +34,7 @@ class E2eKeysHandlerTestCase(unittest.TestCase): def setUp(self): self.hs = yield utils.setup_test_homeserver( handlers=None, - replication_client=mock.Mock(), + federation_client=mock.Mock(), ) self.handler = synapse.handlers.e2e_keys.E2eKeysHandler(self.hs) diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index f9f828471a..458296ee4c 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -51,7 +51,7 @@ class ProfileTestCase(unittest.TestCase): http_client=None, handlers=None, resource_for_federation=Mock(), - replication_client=self.mock_federation, + federation_client=self.mock_federation, federation_server=Mock(), federation_registry=self.mock_registry, ratelimiter=NonCallableMock(spec_set=[ diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index ceffdaad54..64e07a8c93 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -31,7 +31,7 @@ class BaseSlavedStoreTestCase(unittest.TestCase): self.hs = yield setup_test_homeserver( "blue", http_client=None, - replication_client=Mock(), + federation_client=Mock(), ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), diff --git a/tests/rest/client/v1/test_events.py b/tests/rest/client/v1/test_events.py index f04bf7dfde..2b89c0a3c7 100644 --- a/tests/rest/client/v1/test_events.py +++ b/tests/rest/client/v1/test_events.py @@ -114,7 +114,7 @@ class EventStreamPermissionsTestCase(RestTestCase): hs = yield setup_test_homeserver( http_client=None, - replication_client=Mock(), + federation_client=Mock(), ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), diff --git a/tests/rest/client/v1/test_profile.py b/tests/rest/client/v1/test_profile.py index feddcf024e..deac7f100c 100644 --- a/tests/rest/client/v1/test_profile.py +++ b/tests/rest/client/v1/test_profile.py @@ -45,7 +45,7 @@ class ProfileTestCase(unittest.TestCase): http_client=None, resource_for_client=self.mock_resource, federation=Mock(), - replication_client=Mock(), + federation_client=Mock(), profile_handler=self.mock_handler ) diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 2c0708b0d8..7e8966a1a8 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -46,7 +46,7 @@ class RoomPermissionsTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_client=Mock(), + federation_client=Mock(), ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.ratelimiter = hs.get_ratelimiter() @@ -409,7 +409,7 @@ class RoomsMemberListTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_client=Mock(), + federation_client=Mock(), ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.ratelimiter = hs.get_ratelimiter() @@ -493,7 +493,7 @@ class RoomsCreateTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_client=Mock(), + federation_client=Mock(), ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.ratelimiter = hs.get_ratelimiter() @@ -582,7 +582,7 @@ class RoomTopicTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_client=Mock(), + federation_client=Mock(), ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.ratelimiter = hs.get_ratelimiter() @@ -697,7 +697,7 @@ class RoomMemberStateTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_client=Mock(), + federation_client=Mock(), ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.ratelimiter = hs.get_ratelimiter() @@ -829,7 +829,7 @@ class RoomMessagesTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_client=Mock(), + federation_client=Mock(), ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.ratelimiter = hs.get_ratelimiter() @@ -929,7 +929,7 @@ class RoomInitialSyncTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_client=Mock(), + federation_client=Mock(), ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), @@ -1003,7 +1003,7 @@ class RoomMessageListTestCase(RestTestCase): hs = yield setup_test_homeserver( "red", http_client=None, - replication_client=Mock(), + federation_client=Mock(), ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.ratelimiter = hs.get_ratelimiter() diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py index 62639e3adc..2ec4ecab5b 100644 --- a/tests/rest/client/v1/test_typing.py +++ b/tests/rest/client/v1/test_typing.py @@ -47,7 +47,7 @@ class RoomTypingTestCase(RestTestCase): "red", clock=self.clock, http_client=None, - replication_client=Mock(), + federation_client=Mock(), ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index cc0df7f662..c2e39a7288 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -42,7 +42,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): hs = yield setup_test_homeserver( config=config, federation_sender=Mock(), - replication_client=Mock(), + federation_client=Mock(), ) self.as_token = "token1" @@ -119,7 +119,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): hs = yield setup_test_homeserver( config=config, federation_sender=Mock(), - replication_client=Mock(), + federation_client=Mock(), ) self.db_pool = hs.get_db_pool() @@ -455,7 +455,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): config=config, datastore=Mock(), federation_sender=Mock(), - replication_client=Mock(), + federation_client=Mock(), ) ApplicationServiceStore(None, hs) @@ -473,7 +473,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): config=config, datastore=Mock(), federation_sender=Mock(), - replication_client=Mock(), + federation_client=Mock(), ) with self.assertRaises(ConfigError) as cm: @@ -497,7 +497,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): config=config, datastore=Mock(), federation_sender=Mock(), - replication_client=Mock(), + federation_client=Mock(), ) with self.assertRaises(ConfigError) as cm: -- cgit 1.5.1 From b78717b87ba3fc248838db522f007c13d0cd8c76 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 13:49:13 +0000 Subject: Split RoomMemberHandler into base and master class The intention here is to split the class into the bits that can be done on workers and the bits that have to be done on the master. In future there will also be a class that can be run on the worker, which will delegate work to the master when necessary. --- synapse/handlers/room_member.py | 231 +++++++++++++++++++++++----------------- synapse/server.py | 6 +- 2 files changed, 139 insertions(+), 98 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 6ee8420d1f..ce83d56451 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -14,9 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from twisted.internet import defer @@ -31,6 +28,10 @@ from synapse.types import UserID, RoomID from synapse.util.async import Linearizer from synapse.util.distributor import user_left_room, user_joined_room +import abc +import logging + + logger = logging.getLogger(__name__) id_server_scheme = "https://" @@ -42,6 +43,8 @@ class RoomMemberHandler(object): # API that takes ID strings and returns pagination chunks. These concerns # ought to be separated out a lot better. + __metaclass__ = abc.ABCMeta + def __init__(self, hs): self.hs = hs self.store = hs.get_datastore() @@ -62,9 +65,56 @@ class RoomMemberHandler(object): self.clock = hs.get_clock() self.spam_checker = hs.get_spam_checker() - self.distributor = hs.get_distributor() - self.distributor.declare("user_joined_room") - self.distributor.declare("user_left_room") + @abc.abstractmethod + def _remote_join(self, requester, remote_room_hosts, room_id, user, content): + """Try and join a room that this server is not in + + Args: + remote_room_hosts (list[str]): List of servers that can be used + to join via. + room_id (str): Room that we are trying to join + user (UserID): User who is trying to join + content (dict): A dict that should be used as the content of the + join event. + + Returns: + Deferred + """ + raise NotImplementedError() + + @abc.abstractmethod + def _remote_reject_invite(self, remote_room_hosts, room_id, target): + """Attempt to reject an invite for a room this server is not in. If we + fail to do so we locally mark the invite as rejected. + + Args: + remote_room_hosts (list[str]): List of servers to use to try and + reject invite + room_id (str) + target (UserID): The user rejecting the invite + + Returns: + Deferred[dict]: A dictionary to be returned to the client, may + include event_id etc, or nothing if we locally rejected + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): + """Get a guest access token for a 3PID, creating a guest account if + one doesn't already exist. + + Args: + medium (str) + address (str) + inviter_user_id (str): The user ID who is trying to invite the + 3PID + + Returns: + Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the + 3PID guest account. + """ + raise NotImplementedError() @defer.inlineCallbacks def _local_membership_update( @@ -137,73 +187,6 @@ class RoomMemberHandler(object): defer.returnValue(event) - @defer.inlineCallbacks - def _remote_join(self, remote_room_hosts, room_id, user, content): - """Try and join a room that this server is not in - - Args: - remote_room_hosts (list[str]): List of servers that can be used - to join via. - room_id (str): Room that we are trying to join - user (UserID): User who is trying to join - content (dict): A dict that should be used as the content of the - join event. - - Returns: - Deferred - """ - if len(remote_room_hosts) == 0: - raise SynapseError(404, "No known servers") - - # We don't do an auth check if we are doing an invite - # join dance for now, since we're kinda implicitly checking - # that we are allowed to join when we decide whether or not we - # need to do the invite/join dance. - yield self.federation_handler.do_invite_join( - remote_room_hosts, - room_id, - user.to_string(), - content, - ) - yield user_joined_room(self.distributor, user, room_id) - - @defer.inlineCallbacks - def _remote_reject_invite(self, remote_room_hosts, room_id, target): - """Attempt to reject an invite for a room this server is not in. If we - fail to do so we locally mark the invite as rejected. - - Args: - remote_room_hosts (list[str]): List of servers to use to try and - reject invite - room_id (str) - target (UserID): The user rejecting the invite - - Returns: - Deferred[dict]: A dictionary to be returned to the client, may - include event_id etc, or nothing if we locally rejected - """ - fed_handler = self.federation_handler - try: - ret = yield fed_handler.do_remotely_reject_invite( - remote_room_hosts, - room_id, - target.to_string(), - ) - defer.returnValue(ret) - except Exception as e: - # if we were unable to reject the exception, just mark - # it as rejected on our end and plough ahead. - # - # The 'except' clause is very broad, but we need to - # capture everything from DNS failures upwards - # - logger.warn("Failed to reject invite: %s", e) - - yield self.store.locally_reject_invite( - target.to_string(), room_id - ) - defer.returnValue({}) - @defer.inlineCallbacks def update_membership( self, @@ -673,6 +656,7 @@ class RoomMemberHandler(object): token, public_keys, fallback_public_key, display_name = ( yield self._ask_id_server_for_third_party_invite( + requester=requester, id_server=id_server, medium=medium, address=address, @@ -709,6 +693,7 @@ class RoomMemberHandler(object): @defer.inlineCallbacks def _ask_id_server_for_third_party_invite( self, + requester, id_server, medium, address, @@ -725,6 +710,7 @@ class RoomMemberHandler(object): Asks an identity server for a third party invite. Args: + requester (Requester) id_server (str): hostname + optional port for the identity server. medium (str): The literal string "email". address (str): The third party address being invited. @@ -767,8 +753,8 @@ class RoomMemberHandler(object): } if self.config.invite_3pid_guest: - rh = self.registration_handler - guest_user_id, guest_access_token = yield rh.get_or_register_3pid_guest( + guest_access_token, guest_user_id = yield self.get_or_register_3pid_guest( + requester=requester, medium=medium, address=address, inviter_user_id=inviter_user_id, @@ -801,27 +787,6 @@ class RoomMemberHandler(object): display_name = data["display_name"] defer.returnValue((token, public_keys, fallback_public_key, display_name)) - @defer.inlineCallbacks - def forget(self, user, room_id): - user_id = user.to_string() - - member = yield self.state_handler.get_current_state( - room_id=room_id, - event_type=EventTypes.Member, - state_key=user_id - ) - membership = member.membership if member else None - - if membership is not None and membership not in [ - Membership.LEAVE, Membership.BAN - ]: - raise SynapseError(400, "User %s in room %s" % ( - user_id, room_id - )) - - if membership: - yield self.store.forget(user_id, room_id) - @defer.inlineCallbacks def _is_host_in_room(self, current_state_ids): # Have we just created the room, and is this about to be the very @@ -843,3 +808,77 @@ class RoomMemberHandler(object): defer.returnValue(True) defer.returnValue(False) + + +class RoomMemberMasterHandler(RoomMemberHandler): + @defer.inlineCallbacks + def _remote_join(self, remote_room_hosts, room_id, user, content): + """Implements RoomMemberHandler._remote_join + """ + if len(remote_room_hosts) == 0: + raise SynapseError(404, "No known servers") + + # We don't do an auth check if we are doing an invite + # join dance for now, since we're kinda implicitly checking + # that we are allowed to join when we decide whether or not we + # need to do the invite/join dance. + yield self.federation_handler.do_invite_join( + remote_room_hosts, + room_id, + user.to_string(), + content, + ) + yield user_joined_room(self.distributor, user, room_id) + + @defer.inlineCallbacks + def _remote_reject_invite(self, remote_room_hosts, room_id, target): + """Implements RoomMemberHandler._remote_reject_invite + """ + fed_handler = self.federation_handler + try: + ret = yield fed_handler.do_remotely_reject_invite( + remote_room_hosts, + room_id, + target.to_string(), + ) + defer.returnValue(ret) + except Exception as e: + # if we were unable to reject the exception, just mark + # it as rejected on our end and plough ahead. + # + # The 'except' clause is very broad, but we need to + # capture everything from DNS failures upwards + # + logger.warn("Failed to reject invite: %s", e) + + yield self.store.locally_reject_invite( + target.to_string(), room_id + ) + defer.returnValue({}) + + def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): + """Implements RoomMemberHandler.get_or_register_3pid_guest + """ + rg = self.registration_handler + return rg.get_or_register_3pid_guest(medium, address, inviter_user_id) + + @defer.inlineCallbacks + def forget(self, user, room_id): + user_id = user.to_string() + + member = yield self.state_handler.get_current_state( + room_id=room_id, + event_type=EventTypes.Member, + state_key=user_id + ) + membership = member.membership if member else None + + if membership is not None and membership not in [ + Membership.LEAVE, Membership.BAN + ]: + raise SynapseError(400, "User %s in room %s" % ( + user_id, room_id + )) + + if membership: + yield self.store.forget(user_id, room_id) diff --git a/synapse/server.py b/synapse/server.py index 1bc8d6f702..3d47e2793b 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -46,7 +46,7 @@ from synapse.handlers.device import DeviceHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.presence import PresenceHandler from synapse.handlers.room_list import RoomListHandler -from synapse.handlers.room_member import RoomMemberHandler +from synapse.handlers.room_member import RoomMemberMasterHandler from synapse.handlers.set_password import SetPasswordHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler @@ -387,7 +387,9 @@ class HomeServer(object): return SpamChecker(self) def build_room_member_handler(self): - return RoomMemberHandler(self) + if self.config.worker_app: + return Exception("Can't use RoomMemberHandler on workers") + return RoomMemberMasterHandler(self) def build_federation_registry(self): return FederationHandlerRegistry() -- cgit 1.5.1 From 82f16faa78864a23653ca952072a6f0e906f3367 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 16:00:26 +0000 Subject: Move user_*_room distributor stuff to master class I added yields when calling user_left_room, but they shouldn't matter on the master process as they always return None anyway. --- synapse/handlers/room_member.py | 55 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ce83d56451..669210b73d 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -116,6 +116,34 @@ class RoomMemberHandler(object): """ raise NotImplementedError() + @abc.abstractmethod + def _user_joined_room(self, target, room_id): + """Notifies distributor on master process that the user has joined the + room. + + Args: + target (UserID) + room_id (str) + + Returns: + Deferred|None + """ + raise NotImplementedError() + + @abc.abstractmethod + def _user_left_room(self, target, room_id): + """Notifies distributor on master process that the user has left the + room. + + Args: + target (UserID) + room_id (str) + + Returns: + Deferred|None + """ + raise NotImplementedError() + @defer.inlineCallbacks def _local_membership_update( self, requester, target, room_id, membership, @@ -178,12 +206,12 @@ class RoomMemberHandler(object): prev_member_event = yield self.store.get_event(prev_member_event_id) newly_joined = prev_member_event.membership != Membership.JOIN if newly_joined: - yield user_joined_room(self.distributor, target, room_id) + yield self._user_joined_room(target, room_id) elif event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = yield self.store.get_event(prev_member_event_id) if prev_member_event.membership == Membership.JOIN: - user_left_room(self.distributor, target, room_id) + yield self._user_left_room(target, room_id) defer.returnValue(event) @@ -460,12 +488,12 @@ class RoomMemberHandler(object): prev_member_event = yield self.store.get_event(prev_member_event_id) newly_joined = prev_member_event.membership != Membership.JOIN if newly_joined: - yield user_joined_room(self.distributor, target_user, room_id) + yield self._user_joined_room(target_user, room_id) elif event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = yield self.store.get_event(prev_member_event_id) if prev_member_event.membership == Membership.JOIN: - user_left_room(self.distributor, target_user, room_id) + yield self._user_left_room(target_user, room_id) @defer.inlineCallbacks def _can_guest_join(self, current_state_ids): @@ -811,6 +839,13 @@ class RoomMemberHandler(object): class RoomMemberMasterHandler(RoomMemberHandler): + def __init__(self, hs): + super(RoomMemberMasterHandler, self).__init__(hs) + + self.distributor = hs.get_distributor() + self.distributor.declare("user_joined_room") + self.distributor.declare("user_left_room") + @defer.inlineCallbacks def _remote_join(self, remote_room_hosts, room_id, user, content): """Implements RoomMemberHandler._remote_join @@ -828,7 +863,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): user.to_string(), content, ) - yield user_joined_room(self.distributor, user, room_id) + yield self._user_joined_room(user, room_id) @defer.inlineCallbacks def _remote_reject_invite(self, remote_room_hosts, room_id, target): @@ -862,6 +897,16 @@ class RoomMemberMasterHandler(RoomMemberHandler): rg = self.registration_handler return rg.get_or_register_3pid_guest(medium, address, inviter_user_id) + def _user_joined_room(self, target, room_id): + """Implements RoomMemberHandler._user_joined_room + """ + return user_joined_room(self.distributor, target, room_id) + + def _user_left_room(self, target, room_id): + """Implements RoomMemberHandler._user_left_room + """ + return user_left_room(self.distributor, target, room_id) + @defer.inlineCallbacks def forget(self, user, room_id): user_id = user.to_string() -- cgit 1.5.1 From 16adb11cc0aa255f0be98d6446d629e18a0dcefe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 16:57:07 +0000 Subject: Correct import order --- synapse/handlers/room_member.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 669210b73d..78b5feee4f 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -14,6 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import abc +import logging + from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from twisted.internet import defer @@ -28,9 +31,6 @@ from synapse.types import UserID, RoomID from synapse.util.async import Linearizer from synapse.util.distributor import user_left_room, user_joined_room -import abc -import logging - logger = logging.getLogger(__name__) -- cgit 1.5.1 From 6dbebef1415fd4d0da29ebf6b90a16b9cc877b96 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 17:15:32 +0000 Subject: Add missing param to docstrings --- synapse/handlers/room_member.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 78b5feee4f..790e11c57c 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -70,6 +70,7 @@ class RoomMemberHandler(object): """Try and join a room that this server is not in Args: + requester (Requester) remote_room_hosts (list[str]): List of servers that can be used to join via. room_id (str): Room that we are trying to join @@ -88,6 +89,7 @@ class RoomMemberHandler(object): fail to do so we locally mark the invite as rejected. Args: + requester (Requester) remote_room_hosts (list[str]): List of servers to use to try and reject invite room_id (str) @@ -105,6 +107,7 @@ class RoomMemberHandler(object): one doesn't already exist. Args: + requester (Requester) medium (str) address (str) inviter_user_id (str): The user ID who is trying to invite the -- cgit 1.5.1 From d45a1148245839c5f9776fee6d463ed69981d729 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 17:24:34 +0000 Subject: Raise, don't return, exception --- synapse/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/server.py b/synapse/server.py index 3d47e2793b..6ffe900b74 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -388,7 +388,7 @@ class HomeServer(object): def build_room_member_handler(self): if self.config.worker_app: - return Exception("Can't use RoomMemberHandler on workers") + raise Exception("Can't use RoomMemberHandler on workers") return RoomMemberMasterHandler(self) def build_federation_registry(self): -- cgit 1.5.1 From 3518d0ea8fc4cf9247790fde587e37e5d129ddb0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 17:36:50 +0000 Subject: Split up ProfileStore --- synapse/replication/slave/storage/profile.py | 21 ++++++++++++ synapse/storage/profile.py | 50 +++++++++++++++------------- 2 files changed, 47 insertions(+), 24 deletions(-) create mode 100644 synapse/replication/slave/storage/profile.py diff --git a/synapse/replication/slave/storage/profile.py b/synapse/replication/slave/storage/profile.py new file mode 100644 index 0000000000..46c28d4171 --- /dev/null +++ b/synapse/replication/slave/storage/profile.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.storage.profile import ProfileWorkerStore + + +class SlavedProfileStore(ProfileWorkerStore, BaseSlavedStore): + pass diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index ec02e73bc2..8612bd5ecc 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -21,14 +21,7 @@ from synapse.api.errors import StoreError from ._base import SQLBaseStore -class ProfileStore(SQLBaseStore): - def create_profile(self, user_localpart): - return self._simple_insert( - table="profiles", - values={"user_id": user_localpart}, - desc="create_profile", - ) - +class ProfileWorkerStore(SQLBaseStore): @defer.inlineCallbacks def get_profileinfo(self, user_localpart): try: @@ -61,14 +54,6 @@ class ProfileStore(SQLBaseStore): desc="get_profile_displayname", ) - def set_profile_displayname(self, user_localpart, new_displayname): - return self._simple_update_one( - table="profiles", - keyvalues={"user_id": user_localpart}, - updatevalues={"displayname": new_displayname}, - desc="set_profile_displayname", - ) - def get_profile_avatar_url(self, user_localpart): return self._simple_select_one_onecol( table="profiles", @@ -77,14 +62,6 @@ class ProfileStore(SQLBaseStore): desc="get_profile_avatar_url", ) - def set_profile_avatar_url(self, user_localpart, new_avatar_url): - return self._simple_update_one( - table="profiles", - keyvalues={"user_id": user_localpart}, - updatevalues={"avatar_url": new_avatar_url}, - desc="set_profile_avatar_url", - ) - def get_from_remote_profile_cache(self, user_id): return self._simple_select_one( table="remote_profile_cache", @@ -94,6 +71,31 @@ class ProfileStore(SQLBaseStore): desc="get_from_remote_profile_cache", ) + +class ProfileStore(ProfileWorkerStore): + def create_profile(self, user_localpart): + return self._simple_insert( + table="profiles", + values={"user_id": user_localpart}, + desc="create_profile", + ) + + def set_profile_displayname(self, user_localpart, new_displayname): + return self._simple_update_one( + table="profiles", + keyvalues={"user_id": user_localpart}, + updatevalues={"displayname": new_displayname}, + desc="set_profile_displayname", + ) + + def set_profile_avatar_url(self, user_localpart, new_avatar_url): + return self._simple_update_one( + table="profiles", + keyvalues={"user_id": user_localpart}, + updatevalues={"avatar_url": new_avatar_url}, + desc="set_profile_avatar_url", + ) + def add_remote_profile_cache(self, user_id, displayname, avatar_url): """Ensure we are caching the remote user's profiles. -- cgit 1.5.1 From df8ff682a72362acde7d99f7235ad4d4f87817e0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 17:38:21 +0000 Subject: Only update remote profile cache on master --- synapse/handlers/profile.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index cb710fe796..3465a787ab 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -38,7 +38,10 @@ class ProfileHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() - self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS) + if hs.config.worker_app is None: + self.clock.looping_call( + self._update_remote_profile_cache, self.PROFILE_UPDATE_MS, + ) @defer.inlineCallbacks def get_profile(self, user_id): -- cgit 1.5.1 From 350331d4664467358469e7d6161f9747707239ba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 17:50:39 +0000 Subject: _remote_join and co take a requester --- synapse/handlers/room_member.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 1d7e6997b9..9977be8831 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -370,7 +370,7 @@ class RoomMemberHandler(object): content["kind"] = "guest" ret = yield self._remote_join( - remote_room_hosts, room_id, target, content + requester, remote_room_hosts, room_id, target, content ) defer.returnValue(ret) @@ -392,7 +392,7 @@ class RoomMemberHandler(object): # send the rejection to the inviter's HS. remote_room_hosts = remote_room_hosts + [inviter.domain] res = yield self._remote_reject_invite( - remote_room_hosts, room_id, target, + requester, remote_room_hosts, room_id, target, ) defer.returnValue(res) @@ -849,7 +849,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): self.distributor.declare("user_left_room") @defer.inlineCallbacks - def _remote_join(self, remote_room_hosts, room_id, user, content): + def _remote_join(self, requester, remote_room_hosts, room_id, user, content): """Implements RoomMemberHandler._remote_join """ if len(remote_room_hosts) == 0: @@ -868,7 +868,7 @@ class RoomMemberMasterHandler(RoomMemberHandler): yield self._user_joined_room(user, room_id) @defer.inlineCallbacks - def _remote_reject_invite(self, remote_room_hosts, room_id, target): + def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): """Implements RoomMemberHandler._remote_reject_invite """ fed_handler = self.federation_handler -- cgit 1.5.1 From b27320b5503d38e6bd05375b272c1a1d18656ea2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 16:32:37 +0000 Subject: Implement RoomMemberWorkerHandler --- synapse/handlers/room_member.py | 78 ++++++++ synapse/replication/http/__init__.py | 2 + synapse/replication/http/membership.py | 334 +++++++++++++++++++++++++++++++++ synapse/server.py | 6 +- 4 files changed, 418 insertions(+), 2 deletions(-) create mode 100644 synapse/replication/http/membership.py diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 1d7e6997b9..3c5751d66c 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -27,6 +27,10 @@ from synapse.api.constants import ( EventTypes, Membership, ) from synapse.api.errors import AuthError, SynapseError, Codes +from synapse.replication.http.membership import ( + remote_join, remote_reject_invite, get_or_register_3pid_guest, + notify_user_membership_change, +) from synapse.types import UserID, RoomID from synapse.util.async import Linearizer from synapse.util.distributor import user_left_room, user_joined_room @@ -929,3 +933,77 @@ class RoomMemberMasterHandler(RoomMemberHandler): if membership: yield self.store.forget(user_id, room_id) + + +class RoomMemberWorkerHandler(RoomMemberHandler): + @defer.inlineCallbacks + def _remote_join(self, requester, remote_room_hosts, room_id, user, content): + """Implements RoomMemberHandler._remote_join + """ + if len(remote_room_hosts) == 0: + raise SynapseError(404, "No known servers") + + ret = yield remote_join( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + remote_room_hosts=remote_room_hosts, + room_id=room_id, + user_id=user.to_string(), + content=content, + ) + + yield self._user_joined_room(user, room_id) + + defer.returnValue(ret) + + def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): + """Implements RoomMemberHandler._remote_reject_invite + """ + return remote_reject_invite( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + remote_room_hosts=remote_room_hosts, + room_id=room_id, + user_id=target.to_string(), + ) + + def _user_joined_room(self, target, room_id): + """Implements RoomMemberHandler._user_joined_room + """ + return notify_user_membership_change( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + user_id=target.to_string(), + room_id=room_id, + change="join", + ) + + def _user_left_room(self, target, room_id): + """Implements RoomMemberHandler._user_left_room + """ + return notify_user_membership_change( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + user_id=target.to_string(), + room_id=room_id, + change="left", + ) + + def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): + """Implements RoomMemberHandler.get_or_register_3pid_guest + """ + return get_or_register_3pid_guest( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + medium=medium, + address=address, + inviter_user_id=inviter_user_id, + ) diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index b378b41646..7a148301de 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -15,6 +15,7 @@ import send_event +import membership from synapse.http.server import JsonResource @@ -29,3 +30,4 @@ class ReplicationRestResource(JsonResource): def register_servlets(self, hs): send_event.register_servlets(hs, self) + membership.register_servlets(hs, self) diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py new file mode 100644 index 0000000000..df16c3b2b4 --- /dev/null +++ b/synapse/replication/http/membership.py @@ -0,0 +1,334 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.errors import SynapseError, MatrixCodeMessageException +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.types import Requester, UserID +from synapse.util.distributor import user_left_room, user_joined_room + +import logging +import re + +logger = logging.getLogger(__name__) + + +@defer.inlineCallbacks +def remote_join(client, host, port, requester, remote_room_hosts, + room_id, user_id, content): + """Ask the master to do a remote join for the given user to the given room + + Args: + client (SimpleHttpClient) + host (str): host of master + port (int): port on master listening for HTTP replication + requester (Requester) + remote_room_hosts (list[str]): Servers to try and join via + room_id (str) + user_id (str) + content (dict): The event content to use for the join event + + Returns: + Deferred + """ + uri = "http://%s:%s/_synapse/replication/remote_join" % (host, port) + + payload = { + "requester": requester.serialize(), + "remote_room_hosts": remote_room_hosts, + "room_id": room_id, + "user_id": user_id, + "content": content, + } + + try: + result = yield client.post_json_get_json(uri, payload) + except MatrixCodeMessageException as e: + # We convert to SynapseError as we know that it was a SynapseError + # on the master process that we should send to the client. (And + # importantly, not stack traces everywhere) + raise SynapseError(e.code, e.msg, e.errcode) + defer.returnValue(result) + + +@defer.inlineCallbacks +def remote_reject_invite(client, host, port, requester, remote_room_hosts, + room_id, user_id): + """Ask master to reject the invite for the user and room. + + Args: + client (SimpleHttpClient) + host (str): host of master + port (int): port on master listening for HTTP replication + requester (Requester) + remote_room_hosts (list[str]): Servers to try and reject via + room_id (str) + user_id (str) + + Returns: + Deferred + """ + uri = "http://%s:%s/_synapse/replication/remote_reject_invite" % (host, port) + + payload = { + "requester": requester.serialize(), + "remote_room_hosts": remote_room_hosts, + "room_id": room_id, + "user_id": user_id, + } + + try: + result = yield client.post_json_get_json(uri, payload) + except MatrixCodeMessageException as e: + # We convert to SynapseError as we know that it was a SynapseError + # on the master process that we should send to the client. (And + # importantly, not stack traces everywhere) + raise SynapseError(e.code, e.msg, e.errcode) + defer.returnValue(result) + + +@defer.inlineCallbacks +def get_or_register_3pid_guest(client, host, port, requester, + medium, address, inviter_user_id): + """Ask the master to get/create a guest account for given 3PID. + + Args: + client (SimpleHttpClient) + host (str): host of master + port (int): port on master listening for HTTP replication + requester (Requester) + medium (str) + address (str) + inviter_user_id (str): The user ID who is trying to invite the + 3PID + + Returns: + Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the + 3PID guest account. + """ + + uri = "http://%s:%s/_synapse/replication/get_or_register_3pid_guest" % (host, port) + + payload = { + "requester": requester.serialize(), + "medium": medium, + "address": address, + "inviter_user_id": inviter_user_id, + } + + try: + result = yield client.post_json_get_json(uri, payload) + except MatrixCodeMessageException as e: + # We convert to SynapseError as we know that it was a SynapseError + # on the master process that we should send to the client. (And + # importantly, not stack traces everywhere) + raise SynapseError(e.code, e.msg, e.errcode) + defer.returnValue(result) + + +@defer.inlineCallbacks +def notify_user_membership_change(client, host, port, user_id, room_id, change): + """Notify master that a user has joined or left the room + + Args: + client (SimpleHttpClient) + host (str): host of master + port (int): port on master listening for HTTP replication. + user_id (str) + room_id (str) + change (str): Either "join" or "left" + + Returns: + Deferred + """ + assert change in ("join", "left") + + uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change) + + payload = { + "user_id": user_id, + "room_id": room_id, + } + + try: + result = yield client.post_json_get_json(uri, payload) + except MatrixCodeMessageException as e: + # We convert to SynapseError as we know that it was a SynapseError + # on the master process that we should send to the client. (And + # importantly, not stack traces everywhere) + raise SynapseError(e.code, e.msg, e.errcode) + defer.returnValue(result) + + +class ReplicationRemoteJoinRestServlet(RestServlet): + PATTERNS = [re.compile("^/_synapse/replication/remote_join$")] + + def __init__(self, hs): + super(ReplicationRemoteJoinRestServlet, self).__init__() + + self.federation_handler = hs.get_handlers().federation_handler + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + @defer.inlineCallbacks + def on_POST(self, request): + content = parse_json_object_from_request(request) + + remote_room_hosts = content["remote_room_hosts"] + room_id = content["room_id"] + user_id = content["user_id"] + event_content = content["content"] + + requester = Requester.deserialize(self.store, content["requester"]) + + if requester.user: + request.authenticated_entity = requester.user.to_string() + + logger.info( + "remote_join: %s into room: %s", + user_id, room_id, + ) + + yield self.federation_handler.do_invite_join( + remote_room_hosts, + room_id, + user_id, + event_content, + ) + + defer.returnValue((200, {})) + + +class ReplicationRemoteRejectInviteRestServlet(RestServlet): + PATTERNS = [re.compile("^/_synapse/replication/remote_reject_invite$")] + + def __init__(self, hs): + super(ReplicationRemoteRejectInviteRestServlet, self).__init__() + + self.federation_handler = hs.get_handlers().federation_handler + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + @defer.inlineCallbacks + def on_POST(self, request): + content = parse_json_object_from_request(request) + + remote_room_hosts = content["remote_room_hosts"] + room_id = content["room_id"] + user_id = content["user_id"] + + requester = Requester.deserialize(self.store, content["requester"]) + + if requester.user: + request.authenticated_entity = requester.user.to_string() + + logger.info( + "remote_reject_invite: %s out of room: %s", + user_id, room_id, + ) + + try: + event = yield self.federation_handler.do_remotely_reject_invite( + remote_room_hosts, + room_id, + user_id, + ) + ret = event.get_pdu_json() + except Exception as e: + # if we were unable to reject the exception, just mark + # it as rejected on our end and plough ahead. + # + # The 'except' clause is very broad, but we need to + # capture everything from DNS failures upwards + # + logger.warn("Failed to reject invite: %s", e) + + yield self.store.locally_reject_invite( + user_id, room_id + ) + ret = {} + + defer.returnValue((200, ret)) + + +class ReplicationRegister3PIDGuestRestServlet(RestServlet): + PATTERNS = [re.compile("^/_synapse/replication/get_or_register_3pid_guest$")] + + def __init__(self, hs): + super(ReplicationRegister3PIDGuestRestServlet, self).__init__() + + self.registeration_handler = hs.get_handlers().registration_handler + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + @defer.inlineCallbacks + def on_POST(self, request): + content = parse_json_object_from_request(request) + + medium = content["medium"] + address = content["address"] + inviter_user_id = content["inviter_user_id"] + + requester = Requester.deserialize(self.store, content["requester"]) + + if requester.user: + request.authenticated_entity = requester.user.to_string() + + logger.info("get_or_register_3pid_guest: %r", content) + + ret = yield self.registeration_handler.get_or_register_3pid_guest( + medium, address, inviter_user_id, + ) + + defer.returnValue((200, ret)) + + +class ReplicationUserJoinedLeftRoomRestServlet(RestServlet): + PATTERNS = [re.compile("^/_synapse/replication/user_(?Pjoin|left)_room$")] + + def __init__(self, hs): + super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__() + + self.registeration_handler = hs.get_handlers().registration_handler + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.distributor = hs.get_distributor() + + def on_POST(self, request, change): + content = parse_json_object_from_request(request) + + user_id = content["user_id"] + room_id = content["room_id"] + + logger.info("user membership change: %s in %s", user_id, room_id) + + user = UserID.from_string(user_id) + + if change == "join": + user_joined_room(self.distributor, user, room_id) + elif change == "left": + user_left_room(self.distributor, user, room_id) + else: + raise Exception("Unrecognized change: %r", change) + + return (200, {}) + + +def register_servlets(hs, http_server): + ReplicationRemoteJoinRestServlet(hs).register(http_server) + ReplicationRemoteRejectInviteRestServlet(hs).register(http_server) + ReplicationRegister3PIDGuestRestServlet(hs).register(http_server) + ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index 763f0f5a68..c48c96727e 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -47,7 +47,9 @@ from synapse.handlers.device import DeviceHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.presence import PresenceHandler from synapse.handlers.room_list import RoomListHandler -from synapse.handlers.room_member import RoomMemberMasterHandler +from synapse.handlers.room_member import ( + RoomMemberMasterHandler, RoomMemberWorkerHandler, +) from synapse.handlers.set_password import SetPasswordHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler @@ -393,7 +395,7 @@ class HomeServer(object): def build_room_member_handler(self): if self.config.worker_app: - raise Exception("Can't use RoomMemberHandler on workers") + return RoomMemberWorkerHandler(self) return RoomMemberMasterHandler(self) def build_federation_registry(self): -- cgit 1.5.1 From a08726fc4262c6d8e9c69db3df322317cb87f6c2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 15:20:54 +0000 Subject: Add is_blocked to worker store --- synapse/storage/room.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 7f2c08d7a6..34ed84ea22 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -157,6 +157,18 @@ class RoomWorkerStore(SQLBaseStore): "get_public_room_changes", get_public_room_changes_txn ) + @cached(max_entries=10000) + def is_room_blocked(self, room_id): + return self._simple_select_one_onecol( + table="blocked_rooms", + keyvalues={ + "room_id": room_id, + }, + retcol="1", + allow_none=True, + desc="is_room_blocked", + ) + class RoomStore(RoomWorkerStore, SearchStore): @@ -485,18 +497,6 @@ class RoomStore(RoomWorkerStore, SearchStore): else: defer.returnValue(None) - @cached(max_entries=10000) - def is_room_blocked(self, room_id): - return self._simple_select_one_onecol( - table="blocked_rooms", - keyvalues={ - "room_id": room_id, - }, - retcol="1", - allow_none=True, - desc="is_room_blocked", - ) - @defer.inlineCallbacks def block_room(self, room_id, user_id): yield self._simple_insert( @@ -507,7 +507,11 @@ class RoomStore(RoomWorkerStore, SearchStore): }, desc="block_room", ) - self.is_room_blocked.invalidate((room_id,)) + yield self.runInteraction( + "block_room_invalidation", + self._invalidate_cache_and_stream, + self.is_room_blocked, (room_id,), + ) def get_media_mxcs_in_room(self, room_id): """Retrieves all the local and remote media MXC URIs in a given room -- cgit 1.5.1 From d144ed6ffb578db83bff0f36826d25d9f09bcd30 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 13 Mar 2018 22:36:04 +0000 Subject: fix bug #2926 (loading all state for a given type from the DB if the state_key is None) (#2990) Fixes a regression that had crept in where the caching layer upholds requests for loading state which is filtered by type (but not by state_key), but the DB layer itself would interpret a missing state_key as a request to filter by null state_key rather than returning all state_keys. --- synapse/storage/state.py | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 2b325e1c1f..ffa4246031 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -240,6 +240,9 @@ class StateGroupWorkerStore(SQLBaseStore): ( "AND type = ? AND state_key = ?", (etype, state_key) + ) if state_key is not None else ( + "AND type = ?", + (etype,) ) for etype, state_key in types ] @@ -259,10 +262,19 @@ class StateGroupWorkerStore(SQLBaseStore): key = (typ, state_key) results[group][key] = event_id else: + where_args = [] + where_clauses = [] + wildcard_types = False if types is not None: - where_clause = "AND (%s)" % ( - " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), - ) + for typ in types: + if typ[1] is None: + where_clauses.append("(type = ?)") + where_args.extend(typ[0]) + wildcard_types = True + else: + where_clauses.append("(type = ? AND state_key = ?)") + where_args.extend([typ[0], typ[1]]) + where_clause = "AND (%s)" % (" OR ".join(where_clauses)) else: where_clause = "" @@ -279,7 +291,7 @@ class StateGroupWorkerStore(SQLBaseStore): # after we finish deduping state, which requires this func) args = [next_group] if types: - args.extend(i for typ in types for i in typ) + args.extend(where_args) txn.execute( "SELECT type, state_key, event_id FROM state_groups_state" @@ -292,9 +304,17 @@ class StateGroupWorkerStore(SQLBaseStore): if (typ, state_key) not in results[group] ) - # If the lengths match then we must have all the types, - # so no need to go walk further down the tree. - if types is not None and len(results[group]) == len(types): + # If the number of entries in the (type,state_key)->event_id dict + # matches the number of (type,state_keys) types we were searching + # for, then we must have found them all, so no need to go walk + # further down the tree... UNLESS our types filter contained + # wildcards (i.e. Nones) in which case we have to do an exhaustive + # search + if ( + types is not None and + not wildcard_types and + len(results[group]) == len(types) + ): break next_group = self._simple_select_one_onecol_txn( -- cgit 1.5.1 From 3f0f06cb31d0293e75943e804b3e1ed8fe58ebed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Mar 2018 11:41:45 +0000 Subject: Split RoomMemberWorkerHandler to separate file --- synapse/handlers/room_member.py | 78 ------------------------- synapse/handlers/room_member_worker.py | 102 +++++++++++++++++++++++++++++++++ synapse/server.py | 5 +- 3 files changed, 104 insertions(+), 81 deletions(-) create mode 100644 synapse/handlers/room_member_worker.py diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 3c5751d66c..1d7e6997b9 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -27,10 +27,6 @@ from synapse.api.constants import ( EventTypes, Membership, ) from synapse.api.errors import AuthError, SynapseError, Codes -from synapse.replication.http.membership import ( - remote_join, remote_reject_invite, get_or_register_3pid_guest, - notify_user_membership_change, -) from synapse.types import UserID, RoomID from synapse.util.async import Linearizer from synapse.util.distributor import user_left_room, user_joined_room @@ -933,77 +929,3 @@ class RoomMemberMasterHandler(RoomMemberHandler): if membership: yield self.store.forget(user_id, room_id) - - -class RoomMemberWorkerHandler(RoomMemberHandler): - @defer.inlineCallbacks - def _remote_join(self, requester, remote_room_hosts, room_id, user, content): - """Implements RoomMemberHandler._remote_join - """ - if len(remote_room_hosts) == 0: - raise SynapseError(404, "No known servers") - - ret = yield remote_join( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, - requester=requester, - remote_room_hosts=remote_room_hosts, - room_id=room_id, - user_id=user.to_string(), - content=content, - ) - - yield self._user_joined_room(user, room_id) - - defer.returnValue(ret) - - def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): - """Implements RoomMemberHandler._remote_reject_invite - """ - return remote_reject_invite( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, - requester=requester, - remote_room_hosts=remote_room_hosts, - room_id=room_id, - user_id=target.to_string(), - ) - - def _user_joined_room(self, target, room_id): - """Implements RoomMemberHandler._user_joined_room - """ - return notify_user_membership_change( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, - user_id=target.to_string(), - room_id=room_id, - change="join", - ) - - def _user_left_room(self, target, room_id): - """Implements RoomMemberHandler._user_left_room - """ - return notify_user_membership_change( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, - user_id=target.to_string(), - room_id=room_id, - change="left", - ) - - def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): - """Implements RoomMemberHandler.get_or_register_3pid_guest - """ - return get_or_register_3pid_guest( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, - requester=requester, - medium=medium, - address=address, - inviter_user_id=inviter_user_id, - ) diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py new file mode 100644 index 0000000000..75a881d990 --- /dev/null +++ b/synapse/handlers/room_member_worker.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.errors import SynapseError +from synapse.handlers.room_member import RoomMemberHandler +from synapse.replication.http.membership import ( + remote_join, remote_reject_invite, get_or_register_3pid_guest, + notify_user_membership_change, +) + + +logger = logging.getLogger(__name__) + + +class RoomMemberWorkerHandler(RoomMemberHandler): + @defer.inlineCallbacks + def _remote_join(self, requester, remote_room_hosts, room_id, user, content): + """Implements RoomMemberHandler._remote_join + """ + if len(remote_room_hosts) == 0: + raise SynapseError(404, "No known servers") + + ret = yield remote_join( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + remote_room_hosts=remote_room_hosts, + room_id=room_id, + user_id=user.to_string(), + content=content, + ) + + yield self._user_joined_room(user, room_id) + + defer.returnValue(ret) + + def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): + """Implements RoomMemberHandler._remote_reject_invite + """ + return remote_reject_invite( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + remote_room_hosts=remote_room_hosts, + room_id=room_id, + user_id=target.to_string(), + ) + + def _user_joined_room(self, target, room_id): + """Implements RoomMemberHandler._user_joined_room + """ + return notify_user_membership_change( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + user_id=target.to_string(), + room_id=room_id, + change="join", + ) + + def _user_left_room(self, target, room_id): + """Implements RoomMemberHandler._user_left_room + """ + return notify_user_membership_change( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + user_id=target.to_string(), + room_id=room_id, + change="left", + ) + + def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): + """Implements RoomMemberHandler.get_or_register_3pid_guest + """ + return get_or_register_3pid_guest( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + medium=medium, + address=address, + inviter_user_id=inviter_user_id, + ) diff --git a/synapse/server.py b/synapse/server.py index c48c96727e..cd0c1a51be 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -47,9 +47,8 @@ from synapse.handlers.device import DeviceHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.presence import PresenceHandler from synapse.handlers.room_list import RoomListHandler -from synapse.handlers.room_member import ( - RoomMemberMasterHandler, RoomMemberWorkerHandler, -) +from synapse.handlers.room_member import RoomMemberMasterHandler +from synapse.handlers.room_member_worker import RoomMemberWorkerHandler from synapse.handlers.set_password import SetPasswordHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler -- cgit 1.5.1 From 62ad701326089ceebbc2683a2e0c9eeeaa734293 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Mar 2018 14:15:49 +0000 Subject: s/join/joined/ in notify_user_membership_change --- synapse/handlers/room_member_worker.py | 2 +- synapse/replication/http/membership.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 75a881d990..493aec1e48 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -73,7 +73,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): port=self.config.worker_replication_http_port, user_id=target.to_string(), room_id=room_id, - change="join", + change="joined", ) def _user_left_room(self, target, room_id): diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index df16c3b2b4..fcd038a8a2 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -154,7 +154,7 @@ def notify_user_membership_change(client, host, port, user_id, room_id, change): Returns: Deferred """ - assert change in ("join", "left") + assert change in ("joined", "left") uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change) @@ -297,7 +297,7 @@ class ReplicationRegister3PIDGuestRestServlet(RestServlet): class ReplicationUserJoinedLeftRoomRestServlet(RestServlet): - PATTERNS = [re.compile("^/_synapse/replication/user_(?Pjoin|left)_room$")] + PATTERNS = [re.compile("^/_synapse/replication/user_(?Pjoined|left)_room$")] def __init__(self, hs): super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__() @@ -317,7 +317,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(RestServlet): user = UserID.from_string(user_id) - if change == "join": + if change == "joined": user_joined_room(self.distributor, user, room_id) elif change == "left": user_left_room(self.distributor, user, room_id) -- cgit 1.5.1 From 0011ede3b0ba9de22af3bb36c300b6779918bfa1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Mar 2018 14:19:23 +0000 Subject: Fix imports --- synapse/replication/http/__init__.py | 5 +---- synapse/replication/http/membership.py | 6 +++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 7a148301de..1d7a607529 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -13,11 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. - -import send_event -import membership - from synapse.http.server import JsonResource +from synapse.replication.http import membership, send_event REPLICATION_PREFIX = "/_synapse/replication" diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index fcd038a8a2..e66c4e881f 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging +import re + from twisted.internet import defer from synapse.api.errors import SynapseError, MatrixCodeMessageException @@ -20,9 +23,6 @@ from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.types import Requester, UserID from synapse.util.distributor import user_left_room, user_joined_room -import logging -import re - logger = logging.getLogger(__name__) -- cgit 1.5.1 From 4f28018c83c071d0a70a545ec953bd1399c55c7e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 15:21:11 +0000 Subject: Register membership/state servlets in event_creator --- synapse/app/event_creator.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index eb593c5278..172e989b54 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -31,14 +31,20 @@ from synapse.replication.slave.storage.account_data import SlavedAccountDataStor from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.profile import SlavedProfileStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.rest.client.v1.room import RoomSendEventRestServlet +from synapse.rest.client.v1.room import ( + RoomSendEventRestServlet, RoomMembershipRestServlet, RoomStateEventRestServlet, + JoinRoomAliasServlet, +) from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree @@ -52,6 +58,9 @@ logger = logging.getLogger("synapse.app.event_creator") class EventCreatorSlavedStore( + DirectoryStore, + TransactionStore, + SlavedProfileStore, SlavedAccountDataStore, SlavedPusherStore, SlavedReceiptsStore, @@ -85,6 +94,9 @@ class EventCreatorServer(HomeServer): elif name == "client": resource = JsonResource(self, canonical_json=False) RoomSendEventRestServlet(self).register(resource) + RoomMembershipRestServlet(self).register(resource) + RoomStateEventRestServlet(self).register(resource) + JoinRoomAliasServlet(self).register(resource) resources.update({ "/_matrix/client/r0": resource, "/_matrix/client/unstable": resource, -- cgit 1.5.1