From 75b4329aaad400abcee4b23e9c88ff845e0d73c7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Dec 2014 16:07:21 +0000 Subject: WIP for new way of managing events. --- synapse/events/snapshot.py | 63 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 synapse/events/snapshot.py (limited to 'synapse/events/snapshot.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py new file mode 100644 index 0000000000..ca15ec09ae --- /dev/null +++ b/synapse/events/snapshot.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + + +class EventSnapshot(object): + def __init__(self, prev_events, depth, current_state, + current_state_group): + self._prev_events = prev_events + self._depth = depth + self._current_state = current_state + self._current_state_group = current_state_group + + +class EventCache(object): + def __init__(self, store): + self._store = store + + self._cache = {} + + @defer.inlineCallbacks + def load_event(self, event_id): + event = self._cache.get(event_id, None) + + if not event: + event = yield self._store.get_event( + event_id, + allow_none=True + ) + + if event: + self._cache[event_id] = event + + defer.returnValue(event) + + def load_event_from_cache(self, event_id): + return self._cache.get(event_id, None) + + def add_to_cache(self, *events): + self._cache.update({ + event.event_id: event + for event in events + }) + + +class EventContext(object): + + def __init__(self, current_state, auth_events): + self.current_state = current_state + self.auth_events = auth_events -- cgit 1.4.1 From d044121168672c657e595525af9b588c8769e9bb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Dec 2014 09:08:26 +0000 Subject: Various typos and bug fixes. --- synapse/api/auth.py | 16 ++++--- synapse/events/snapshot.py | 3 +- synapse/events/utils.py | 2 + synapse/federation/replication.py | 12 +++-- synapse/handlers/_base.py | 8 ++-- synapse/handlers/federation.py | 97 +++++++++++++++------------------------ synapse/state.py | 16 +++++-- synapse/storage/state.py | 6 +-- 8 files changed, 80 insertions(+), 80 deletions(-) (limited to 'synapse/events/snapshot.py') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3f2e58a5ef..821e3ba5e2 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -24,6 +24,7 @@ from synapse.api.events.room import ( RoomJoinRulesEvent, RoomCreateEvent, RoomAliasesEvent, ) from synapse.util.logutils import log_function +from synapse.util.async import run_on_reactor from syutil.base64util import encode_base64 import logging @@ -352,17 +353,19 @@ class Auth(object): @defer.inlineCallbacks def add_auth_events(self, builder, context): + yield run_on_reactor() + if builder.type == RoomCreateEvent.TYPE: builder.auth_events = [] return - auth_events = [] + auth_ids = [] key = (RoomPowerLevelsEvent.TYPE, "", ) power_level_event = context.current_state.get(key) if power_level_event: - auth_events.append(power_level_event.event_id) + auth_ids.append(power_level_event.event_id) key = (RoomJoinRulesEvent.TYPE, "", ) join_rule_event = context.current_state.get(key) @@ -373,7 +376,7 @@ class Auth(object): key = (RoomCreateEvent.TYPE, "", ) create_event = context.current_state.get(key) if create_event: - auth_events.append(create_event.event_id) + auth_ids.append(create_event.event_id) if join_rule_event: join_rule = join_rule_event.content.get("join_rule") @@ -385,15 +388,14 @@ class Auth(object): e_type = builder.content["membership"] if e_type in [Membership.JOIN, Membership.INVITE]: if join_rule_event: - auth_events.append(join_rule_event.event_id) + auth_ids.append(join_rule_event.event_id) if member_event and not is_public: - auth_events.append(member_event.event_id) + auth_ids.append(member_event.event_id) elif member_event: if member_event.content["membership"] == Membership.JOIN: - auth_events.append(member_event.event_id) + auth_ids.append(member_event.event_id) - auth_ids = [(a.event_id, h) for a, h in auth_events] auth_events_entries = yield self.store.add_event_hashes( auth_ids ) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index ca15ec09ae..e0cbacc19c 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -58,6 +58,7 @@ class EventCache(object): class EventContext(object): - def __init__(self, current_state, auth_events): + def __init__(self, current_state=None, auth_events=None): self.current_state = current_state self.auth_events = auth_events + self.state_group = None diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 1b05ee0a95..485f075406 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -95,4 +95,6 @@ def serialize_event(hs, e): d["unsigned"]["age"] = now - d["unsigned"]["age_ts"] del d["unsigned"]["age_ts"] + d["user_id"] = d.pop("sender", None) + return d \ No newline at end of file diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index b11df9e5c6..3af24ee46d 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -558,7 +558,13 @@ class ReplicationLayer(object): origin, pdu.event_id, do_auth=False ) - if existing and (not existing.outlier or pdu.outlier): + already_seen = ( + existing and ( + not existing.internal_metadata.outlier + or pdu.internal_metadata.outlier + ) + ) + if already_seen: logger.debug("Already seen pdu %s", pdu.event_id) defer.returnValue({}) return @@ -596,7 +602,7 @@ class ReplicationLayer(object): # ) # Get missing pdus if necessary. - if not pdu.outlier: + if not pdu.internal_metadata.outlier: # We only backfill backwards to the min depth. min_depth = yield self.handler.get_min_depth_for_context( pdu.room_id @@ -663,7 +669,7 @@ class ReplicationLayer(object): pdu_json ) - builder.internal_metadata = outlier + builder.internal_metadata.outlier = outlier return builder.build() diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 810ce138ff..0bff644192 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -62,6 +62,8 @@ class BaseHandler(object): @defer.inlineCallbacks def _create_new_client_event(self, builder): + yield run_on_reactor() + context = EventContext() latest_ret = yield self.store.get_latest_events_in_room( @@ -79,7 +81,7 @@ class BaseHandler(object): builder, context, ) - group, prev_state = ret + prev_state = ret if builder.is_state(): prev_state = yield self.store.add_event_hashes( @@ -88,8 +90,6 @@ class BaseHandler(object): builder.prev_state = prev_state - builder.internal_metadata.state_group = group - yield self.auth.add_auth_events(builder, context) add_hashes_and_signatures( @@ -105,6 +105,8 @@ class BaseHandler(object): @defer.inlineCallbacks def handle_new_client_event(self, event, context, extra_destinations=[], extra_users=[], suppress_auth=False): + yield run_on_reactor() + # We now need to go and hit out to wherever we need to hit out to. if not suppress_auth: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 5264e3eafc..38ee32d26e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -149,7 +149,7 @@ class FederationHandler(BaseHandler): event.room_id, self.server_name ) - if not is_in_room and not event.outlier: + if not is_in_room and not event.internal_metadata.outlier: logger.debug("Got event for room we're not in.") replication_layer = self.replication_layer @@ -160,7 +160,7 @@ class FederationHandler(BaseHandler): ) for e in auth_chain: - e.outlier = True + e.internal_metadata.outlier = True try: yield self._handle_new_event(e, fetch_missing=False) except: @@ -180,7 +180,7 @@ class FederationHandler(BaseHandler): if state: for e in state: - e.outlier = True + e.internal_metadata.outlier = True try: yield self._handle_new_event(e) except: @@ -254,11 +254,18 @@ class FederationHandler(BaseHandler): event = pdu # FIXME (erikj): Not sure this actually works :/ - yield self.state_handler.annotate_event_with_state(event) + context = EventContext() + yield self.state_handler.annotate_context_with_state(event, context) - events.append(event) + events.append( + (event, context) + ) - yield self.store.persist_event(event, backfilled=True) + yield self.store.persist_event( + event, + context=context, + backfilled=True + ) defer.returnValue(events) @@ -326,7 +333,7 @@ class FederationHandler(BaseHandler): assert(event.state_key == joinee) assert(event.room_id == room_id) - event.outlier = False + event.internal_metadata.outlier = False self.room_queues[room_id] = [] @@ -369,7 +376,7 @@ class FederationHandler(BaseHandler): pass for e in auth_chain: - e.outlier = True + e.internal_metadata.outlier = True try: yield self._handle_new_event(e, fetch_missing=False) except: @@ -380,7 +387,7 @@ class FederationHandler(BaseHandler): for e in state: # FIXME: Auth these. - e.outlier = True + e.internal_metadata.outlier = True try: yield self._handle_new_event( e, @@ -448,7 +455,7 @@ class FederationHandler(BaseHandler): """ event = pdu - event.outlier = False + event.internal_metadata.outlier = False yield self._handle_new_event(event) @@ -643,70 +650,42 @@ class FederationHandler(BaseHandler): def _handle_new_event(self, event, state=None, backfilled=False, current_state=None, fetch_missing=True): context = EventContext() - is_new_state = yield self.state_handler.annotate_event_with_state( + yield self.state_handler.annotate_context_with_state( event, old_state=state ) - if event.old_state_events: - known_ids = set( - [s.event_id for s in event.old_state_events.values()] - ) - for e_id, _ in event.auth_events: - if e_id not in known_ids: - e = yield self.store.get_event( - e_id, - allow_none=True, - ) - - if not e: - # TODO: Do some conflict res to make sure that we're - # not the ones who are wrong. - logger.info( - "Rejecting %s as %s not in %s", - event.event_id, e_id, known_ids, - ) - raise AuthError(403, "Auth events are stale") + is_new_state = not event.internal_metadata.outlier - auth_events = event.old_state_events - else: - # We need to get the auth events from somewhere. - - # TODO: Don't just hit the DBs? - - auth_events = {} - for e_id, _ in event.auth_events: + known_ids = set( + [s.event_id for s in context.auth_events.values()] + ) + for e_id, _ in event.auth_events: + if e_id not in known_ids: e = yield self.store.get_event( e_id, + context, allow_none=True, ) if not e: - e = yield self.replication_layer.get_pdu( - event.origin, e_id, outlier=True + # TODO: Do some conflict res to make sure that we're + # not the ones who are wrong. + logger.info( + "Rejecting %s as %s not in %s", + event.event_id, e_id, known_ids, ) + raise AuthError(403, "Auth events are stale") - if e and fetch_missing: - try: - yield self.on_receive_pdu(event.origin, e, False) - except: - logger.exception( - "Failed to parse auth event %s", - e_id, - ) + context.auth_events[(e.type, e.state_key)] = e - if not e: - logger.warn("Can't find auth event %s.", e_id) + if event.type == RoomMemberEvent.TYPE and not event.auth_events: + if len(event.prev_events) == 1: + c = yield self.store.get_event(event.prev_events[0][0]) + if c.type == RoomCreateEvent.TYPE: + context.auth_events[(c.type, c.state_key)] = c - auth_events[(e.type, e.state_key)] = e - - if event.type == RoomMemberEvent.TYPE and not event.auth_events: - if len(event.prev_events) == 1: - c = yield self.store.get_event(event.prev_events[0][0]) - if c.type == RoomCreateEvent.TYPE: - auth_events[(c.type, c.state_key)] = c - - self.auth.check(event, auth_events=auth_events) + self.auth.check(event, auth_events=context.auth_events) yield self.store.persist_event( event, diff --git a/synapse/state.py b/synapse/state.py index cbb4243fad..464cbae564 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -136,7 +136,16 @@ class StateHandler(object): defer.returnValue(res[1].values()) @defer.inlineCallbacks - def annotate_context_with_state(self, event, context): + def annotate_context_with_state(self, event, context, old_state=None): + yield run_on_reactor() + + if old_state: + context.current_state = { + (s.type, s.state_key): s for s in old_state + } + context.state_group = None + defer.returnValue([]) + if event.is_state(): ret = yield self.resolve_state_groups( [e for e, _ in event.prev_events], @@ -151,6 +160,7 @@ class StateHandler(object): group, curr_state, prev_state = ret context.current_state = curr_state + context.state_group = group prev_state = yield self.store.add_event_hashes( prev_state @@ -164,9 +174,7 @@ class StateHandler(object): if v.event_id in auth_ids } - defer.returnValue( - (group, prev_state) - ) + defer.returnValue(prev_state) @defer.inlineCallbacks @log_function diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b8e721ad72..afe3e5edea 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -87,10 +87,10 @@ class StateStore(SQLBaseStore): ) def _store_state_groups_txn(self, txn, event, context): - if context.current_state_events is None: + if context.current_state is None: return - state_events = context.current_state_events + state_events = context.current_state if event.is_state(): state_events[(event.type, event.state_key)] = event @@ -107,7 +107,7 @@ class StateStore(SQLBaseStore): or_ignore=True, ) - for state in context.state_events.values(): + for state in state_events.values(): self._simple_insert_txn( txn, table="state_groups_state", -- cgit 1.4.1 From 670dcdfc14d855c45b076f852673906dc450f515 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Dec 2014 16:16:58 +0000 Subject: Remove unused functions --- synapse/events/snapshot.py | 42 ------------------------------------------ 1 file changed, 42 deletions(-) (limited to 'synapse/events/snapshot.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index e0cbacc19c..b9fb29be01 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -13,48 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - - -class EventSnapshot(object): - def __init__(self, prev_events, depth, current_state, - current_state_group): - self._prev_events = prev_events - self._depth = depth - self._current_state = current_state - self._current_state_group = current_state_group - - -class EventCache(object): - def __init__(self, store): - self._store = store - - self._cache = {} - - @defer.inlineCallbacks - def load_event(self, event_id): - event = self._cache.get(event_id, None) - - if not event: - event = yield self._store.get_event( - event_id, - allow_none=True - ) - - if event: - self._cache[event_id] = event - - defer.returnValue(event) - - def load_event_from_cache(self, event_id): - return self._cache.get(event_id, None) - - def add_to_cache(self, *events): - self._cache.update({ - event.event_id: event - for event in events - }) - class EventContext(object): -- cgit 1.4.1