diff options
Diffstat (limited to 'synapse')
-rwxr-xr-x | synapse/app/homeserver.py | 10 | ||||
-rw-r--r-- | synapse/config/server.py | 6 | ||||
-rw-r--r-- | synapse/federation/federation_base.py | 11 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 2 | ||||
-rw-r--r-- | synapse/handlers/_base.py | 4 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 53 | ||||
-rw-r--r-- | synapse/rest/media/v1/base_resource.py | 3 | ||||
-rw-r--r-- | synapse/state.py | 22 | ||||
-rw-r--r-- | synapse/storage/_base.py | 45 | ||||
-rw-r--r-- | synapse/storage/state.py | 20 |
10 files changed, 134 insertions, 42 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index d93afdc1c2..65a5dfa84e 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -87,10 +87,16 @@ class SynapseHomeServer(HomeServer): return MatrixFederationHttpClient(self) def build_resource_for_client(self): - return gz_wrap(ClientV1RestResource(self)) + res = ClientV1RestResource(self) + if self.config.gzip_responses: + res = gz_wrap(res) + return res def build_resource_for_client_v2_alpha(self): - return gz_wrap(ClientV2AlphaRestResource(self)) + res = ClientV2AlphaRestResource(self) + if self.config.gzip_responses: + res = gz_wrap(res) + return res def build_resource_for_federation(self): return JsonResource(self) diff --git a/synapse/config/server.py b/synapse/config/server.py index 48a26c65d9..d0c8fb8f3c 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -29,6 +29,7 @@ class ServerConfig(Config): self.soft_file_limit = config["soft_file_limit"] self.daemonize = config.get("daemonize") self.use_frozen_dicts = config.get("use_frozen_dicts", True) + self.gzip_responses = config["gzip_responses"] # Attempt to guess the content_addr for the v0 content repostitory content_addr = config.get("content_addr") @@ -86,6 +87,11 @@ class ServerConfig(Config): # Turn on the twisted telnet manhole service on localhost on the given # port. #manhole: 9000 + + # Should synapse compress HTTP responses to clients that support it? + # This should be disabled if running synapse behind a load balancer + # that can do automatic compression. + gzip_responses: True """ % locals() def read_arguments(self, args): diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index f0430b2cb1..299493af91 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -18,8 +18,6 @@ from twisted.internet import defer from synapse.events.utils import prune_event -from syutil.jsonutil import encode_canonical_json - from synapse.crypto.event_signing import check_event_content_hash from synapse.api.errors import SynapseError @@ -120,16 +118,15 @@ class FederationBase(object): ) except SynapseError: logger.warn( - "Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_pdu_json()), - encode_canonical_json(redacted_pdu_json), + "Signature check failed for %s", + pdu.event_id, ) raise if not check_event_content_hash(pdu): logger.warn( - "Event content has been tampered, redacting %s, %s", - pdu.event_id, encode_canonical_json(pdu.get_dict()) + "Event content has been tampered, redacting.", + pdu.event_id, ) defer.returnValue(redacted_event) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index af87805f34..31190e700a 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -93,6 +93,8 @@ class TransportLayerServer(object): yield self.keyring.verify_json_for_server(origin, json_request) + logger.info("Request from %s", origin) + defer.returnValue((origin, content)) @log_function diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 833ff41377..d6c064b398 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -78,7 +78,9 @@ class BaseHandler(object): context = yield state_handler.compute_event_context(builder) if builder.is_state(): - builder.prev_state = context.prev_state_events + builder.prev_state = yield self.store.add_event_hashes( + context.prev_state_events + ) yield self.auth.add_auth_events(builder, context) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 46ce3699d7..b5d882fd65 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -247,9 +247,15 @@ class FederationHandler(BaseHandler): if set(e_id for e_id, _ in ev.prev_events) - event_ids ] + logger.info( + "backfill: Got %d events with %d edges", + len(events), len(edges), + ) + # For each edge get the current state. auth_events = {} + state_events = {} events_to_state = {} for e_id in edges: state, auth = yield self.replication_layer.get_state_for_room( @@ -258,12 +264,46 @@ class FederationHandler(BaseHandler): event_id=e_id ) auth_events.update({a.event_id: a for a in auth}) + auth_events.update({s.event_id: s for s in state}) + state_events.update({s.event_id: s for s in state}) events_to_state[e_id] = state + seen_events = yield self.store.have_events( + set(auth_events.keys()) | set(state_events.keys()) + ) + + all_events = events + state_events.values() + auth_events.values() + required_auth = set( + a_id for event in all_events for a_id, _ in event.auth_events + ) + + missing_auth = required_auth - set(auth_events) + results = yield defer.gatherResults( + [ + self.replication_layer.get_pdu( + [dest], + event_id, + outlier=True, + timeout=10000, + ) + for event_id in missing_auth + ], + consumeErrors=True + ).addErrback(unwrapFirstError) + auth_events.update({a.event_id: a for a in results}) + yield defer.gatherResults( [ - self._handle_new_event(dest, a) + self._handle_new_event( + dest, a, + auth_events={ + (auth_events[a_id].type, auth_events[a_id].state_key): + auth_events[a_id] + for a_id, _ in a.auth_events + }, + ) for a in auth_events.values() + if a.event_id not in seen_events ], consumeErrors=True, ).addErrback(unwrapFirstError) @@ -274,6 +314,11 @@ class FederationHandler(BaseHandler): dest, event_map[e_id], state=events_to_state[e_id], backfilled=True, + auth_events={ + (auth_events[a_id].type, auth_events[a_id].state_key): + auth_events[a_id] + for a_id, _ in event_map[e_id].auth_events + }, ) for e_id in events_to_state ], @@ -900,8 +945,10 @@ class FederationHandler(BaseHandler): event.event_id, event.signatures, ) + outlier = event.internal_metadata.is_outlier() + context = yield self.state_handler.compute_event_context( - event, old_state=state + event, old_state=state, outlier=outlier, ) if not auth_events: @@ -912,7 +959,7 @@ class FederationHandler(BaseHandler): event.event_id, auth_events, ) - is_new_state = not event.internal_metadata.is_outlier() + is_new_state = not outlier # This is a hack to fix some old rooms where the initial join event # didn't reference the create event in its auth events. diff --git a/synapse/rest/media/v1/base_resource.py b/synapse/rest/media/v1/base_resource.py index c8970165c2..6c83a9478c 100644 --- a/synapse/rest/media/v1/base_resource.py +++ b/synapse/rest/media/v1/base_resource.py @@ -15,6 +15,7 @@ from .thumbnailer import Thumbnailer +from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.http.server import respond_with_json from synapse.util.stringutils import random_string from synapse.api.errors import ( @@ -52,7 +53,7 @@ class BaseMediaResource(Resource): def __init__(self, hs, filepaths): Resource.__init__(self) self.auth = hs.get_auth() - self.client = hs.get_http_client() + self.client = MatrixFederationHttpClient(hs) self.clock = hs.get_clock() self.server_name = hs.hostname self.store = hs.get_datastore() diff --git a/synapse/state.py b/synapse/state.py index 9dddb77d5b..80da90a72c 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -106,7 +106,7 @@ class StateHandler(object): defer.returnValue(state) @defer.inlineCallbacks - def compute_event_context(self, event, old_state=None): + def compute_event_context(self, event, old_state=None, outlier=False): """ Fills out the context with the `current state` of the graph. The `current state` here is defined to be the state of the event graph just before the event - i.e. it never includes `event` @@ -119,9 +119,23 @@ class StateHandler(object): Returns: an EventContext """ + yield run_on_reactor() + context = EventContext() - yield run_on_reactor() + if outlier: + # If this is an outlier, then we know it shouldn't have any current + # state. Certainly store.get_current_state won't return any, and + # persisting the event won't store the state group. + if old_state: + context.current_state = { + (s.type, s.state_key): s for s in old_state + } + else: + context.current_state = {} + context.prev_state_events = [] + context.state_group = None + defer.returnValue(context) if old_state: context.current_state = { @@ -155,10 +169,6 @@ class StateHandler(object): context.current_state = curr_state context.state_group = group if not event.is_state() else None - prev_state = yield self.store.add_event_hashes( - prev_state - ) - if event.is_state(): key = (event.type, event.state_key) if key in context.current_state: diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 39884c2afe..8d33def6c6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -127,7 +127,7 @@ class Cache(object): self.cache.clear() -def cached(max_entries=1000, num_args=1, lru=False): +class CacheDescriptor(object): """ A method decorator that applies a memoizing cache around the function. The function is presumed to take zero or more arguments, which are used in @@ -141,25 +141,32 @@ def cached(max_entries=1000, num_args=1, lru=False): which can be used to insert values into the cache specifically, without calling the calculation function. """ - def wrap(orig): + def __init__(self, orig, max_entries=1000, num_args=1, lru=False): + self.orig = orig + + self.max_entries = max_entries + self.num_args = num_args + self.lru = lru + + def __get__(self, obj, objtype=None): cache = Cache( - name=orig.__name__, - max_entries=max_entries, - keylen=num_args, - lru=lru, + name=self.orig.__name__, + max_entries=self.max_entries, + keylen=self.num_args, + lru=self.lru, ) - @functools.wraps(orig) + @functools.wraps(self.orig) @defer.inlineCallbacks - def wrapped(self, *keyargs): + def wrapped(*keyargs): try: - cached_result = cache.get(*keyargs) + cached_result = cache.get(*keyargs[:self.num_args]) if DEBUG_CACHES: - actual_result = yield orig(self, *keyargs) + actual_result = yield self.orig(obj, *keyargs) if actual_result != cached_result: logger.error( "Stale cache entry %s%r: cached: %r, actual %r", - orig.__name__, keyargs, + self.orig.__name__, keyargs, cached_result, actual_result, ) raise ValueError("Stale cache entry") @@ -170,18 +177,28 @@ def cached(max_entries=1000, num_args=1, lru=False): # while the SELECT is executing (SYN-369) sequence = cache.sequence - ret = yield orig(self, *keyargs) + ret = yield self.orig(obj, *keyargs) - cache.update(sequence, *keyargs + (ret,)) + cache.update(sequence, *keyargs[:self.num_args] + (ret,)) defer.returnValue(ret) wrapped.invalidate = cache.invalidate wrapped.invalidate_all = cache.invalidate_all wrapped.prefill = cache.prefill + + obj.__dict__[self.orig.__name__] = wrapped + return wrapped - return wrap + +def cached(max_entries=1000, num_args=1, lru=False): + return lambda orig: CacheDescriptor( + orig, + max_entries=max_entries, + num_args=num_args, + lru=lru + ) class LoggingTransaction(object): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b24de34f23..f2b17f29ea 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -81,19 +81,23 @@ class StateStore(SQLBaseStore): f, ) - @defer.inlineCallbacks - def c(vals): - vals[:] = yield self._get_events(vals, get_prev_content=False) - - yield defer.gatherResults( + state_list = yield defer.gatherResults( [ - c(vals) - for vals in states.values() + self._fetch_events_for_group(group, vals) + for group, vals in states.items() ], consumeErrors=True, ) - defer.returnValue(states) + defer.returnValue(dict(state_list)) + + @cached(num_args=1) + def _fetch_events_for_group(self, state_group, events): + return self._get_events( + events, get_prev_content=False + ).addCallback( + lambda evs: (state_group, evs) + ) def _store_state_groups_txn(self, txn, event, context): if context.current_state is None: |