diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/federation/replication.py | 16 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 72 | ||||
-rw-r--r-- | synapse/state.py | 2 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 54 | ||||
-rw-r--r-- | synapse/storage/state.py | 2 |
5 files changed, 99 insertions, 47 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 89c0ef49e9..6bfb30b42d 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -559,7 +559,7 @@ class ReplicationLayer(object): if not exists: try: logger.debug( - "Getting missing auth event %s from %s", + "_handle_new_pdu fetch missing auth event %s from %s", e_id, origin, ) @@ -585,6 +585,11 @@ class ReplicationLayer(object): pdu.room_id ) + logger.debug( + "_handle_new_pdu min_depth for %s: %d", + pdu.room_id, min_depth + ) + if min_depth and pdu.depth > min_depth: for event_id, hashes in pdu.prev_events: exists = yield self._get_persisted_pdu( @@ -594,7 +599,10 @@ class ReplicationLayer(object): ) if not exists: - logger.debug("Requesting pdu %s", event_id) + logger.debug( + "_handle_new_pdu requesting pdu %s", + event_id + ) try: yield self.get_pdu( @@ -608,6 +616,10 @@ class ReplicationLayer(object): else: # We need to get the state at this event, since we have reached # a backward extremity edge. + logger.debug( + "_handle_new_pdu getting state for %s", + pdu.room_id + ) state = yield self.get_state_for_context( origin, pdu.room_id, pdu.event_id, ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 14066ac4f3..252c1f1684 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -144,11 +144,24 @@ class FederationHandler(BaseHandler): logger.debug("Event: %s", event) + # FIXME (erikj): Awful hack to make the case where we are not currently + # in the room work + current_state = None + if state: + is_in_room = yield self.auth.check_host_in_room( + event.room_id, + self.server_name + ) + if not is_in_room: + logger.debug("Got event for room we're not in.") + current_state = state + try: yield self._handle_new_event( event, state=state, - backfilled=backfilled + backfilled=backfilled, + current_state=current_state, ) except AuthError as e: raise FederationError( @@ -161,29 +174,11 @@ class FederationHandler(BaseHandler): room = yield self.store.get_room(event.room_id) if not room: - # Huh, let's try and get the current state - try: - yield self.replication_layer.get_state_for_context( - event.origin, event.room_id, event.event_id, - ) - - hosts = yield self.store.get_joined_hosts_for_room( - event.room_id - ) - if self.hs.hostname in hosts: - try: - yield self.store.store_room( - room_id=event.room_id, - room_creator_user_id="", - is_public=False, - ) - except: - pass - except: - logger.exception( - "Failed to get current state for room %s", - event.room_id - ) + yield self.store.store_room( + room_id=event.room_id, + room_creator_user_id="", + is_public=False, + ) if not backfilled: extra_users = [] @@ -244,6 +239,8 @@ class FederationHandler(BaseHandler): pdu=event ) + + defer.returnValue(pdu) @defer.inlineCallbacks @@ -327,13 +324,23 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.outlier = True yield self._handle_new_event(e) + yield self.notifier.on_new_room_event( + e, extra_users=[joinee] + ) for e in state: # FIXME: Auth these. e.outlier = True yield self._handle_new_event(e) + yield self.notifier.on_new_room_event( + e, extra_users=[joinee] + ) - yield self._handle_new_event(event, state=state) + yield self._handle_new_event( + event, + state=state, + current_state=state + ) yield self.notifier.on_new_room_event( event, extra_users=[joinee] @@ -554,7 +561,12 @@ class FederationHandler(BaseHandler): waiters.pop().callback(None) @defer.inlineCallbacks - def _handle_new_event(self, event, state=None, backfilled=False): + def _handle_new_event(self, event, state=None, backfilled=False, + current_state=None): + if state: + for s in state: + yield self._handle_new_event(s) + is_new_state = yield self.state_handler.annotate_event_with_state( event, old_state=state @@ -594,7 +606,10 @@ class FederationHandler(BaseHandler): ) if not e: - raise AuthError(403, "Can't find auth event.") + raise AuthError( + 403, + "Can't find auth event %s." % (e_id, ) + ) auth_events[(e.type, e.state_key)] = e @@ -603,5 +618,6 @@ class FederationHandler(BaseHandler): yield self.store.persist_event( event, backfilled=backfilled, - is_new_state=(is_new_state and not backfilled) + is_new_state=(is_new_state and not backfilled), + current_state=current_state, ) diff --git a/synapse/state.py b/synapse/state.py index 1c999e4d79..430665f7ba 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -82,7 +82,7 @@ class StateHandler(object): if hasattr(event, "outlier") and event.outlier: event.state_group = None event.old_state_events = None - event.state_events = {} + event.state_events = None defer.returnValue(False) return diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 1231794de0..a5ee923bc3 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -93,7 +93,8 @@ class DataStore(RoomMemberStore, RoomStore, @defer.inlineCallbacks @log_function - def persist_event(self, event, backfilled=False, is_new_state=True): + def persist_event(self, event, backfilled=False, is_new_state=True, + current_state=None): stream_ordering = None if backfilled: if not self.min_token_deferred.called: @@ -109,6 +110,7 @@ class DataStore(RoomMemberStore, RoomStore, backfilled=backfilled, stream_ordering=stream_ordering, is_new_state=is_new_state, + current_state=current_state, ) except _RollbackButIsFineException: pass @@ -137,7 +139,7 @@ class DataStore(RoomMemberStore, RoomStore, @log_function def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None, - is_new_state=True): + is_new_state=True, current_state=None): if event.type == RoomMemberEvent.TYPE: self._store_room_member_txn(txn, event) elif event.type == FeedbackEvent.TYPE: @@ -206,8 +208,24 @@ class DataStore(RoomMemberStore, RoomStore, self._store_state_groups_txn(txn, event) + if current_state: + txn.execute("DELETE FROM current_state_events") + + for s in current_state: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": s.event_id, + "room_id": s.room_id, + "type": s.type, + "state_key": s.state_key, + }, + or_replace=True, + ) + is_state = hasattr(event, "state_key") and event.state_key is not None - if is_new_state and is_state: + if is_state: vals = { "event_id": event.event_id, "room_id": event.room_id, @@ -225,17 +243,18 @@ class DataStore(RoomMemberStore, RoomStore, or_replace=True, ) - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) + if is_new_state: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + or_replace=True, + ) for e_id, h in event.prev_state: self._simple_insert_txn( @@ -312,7 +331,12 @@ class DataStore(RoomMemberStore, RoomStore, txn, event.event_id, ref_alg, ref_hash_bytes ) - self._update_min_depth_for_room_txn(txn, event.room_id, event.depth) + if not outlier: + self._update_min_depth_for_room_txn( + txn, + event.room_id, + event.depth + ) def _store_redaction(self, txn, event): txn.execute( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 55ea567793..e0f44b3e59 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -87,7 +87,7 @@ class StateStore(SQLBaseStore): ) def _store_state_groups_txn(self, txn, event): - if not event.state_events: + if event.state_events is None: return state_group = event.state_group |