diff options
-rw-r--r-- | synapse/config/server.py | 3 | ||||
-rw-r--r-- | synapse/config/tls.py | 15 | ||||
-rw-r--r-- | synapse/crypto/context_factory.py | 5 | ||||
-rw-r--r-- | synapse/crypto/keyring.py | 13 | ||||
-rw-r--r-- | synapse/federation/federation_client.py | 121 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 8 | ||||
-rw-r--r-- | synapse/handlers/events.py | 9 | ||||
-rw-r--r-- | synapse/http/server.py | 44 | ||||
-rw-r--r-- | synapse/notifier.py | 16 | ||||
-rw-r--r-- | synapse/util/lrucache.py | 7 |
10 files changed, 188 insertions, 53 deletions
diff --git a/synapse/config/server.py b/synapse/config/server.py index 4e4892d40b..b042d4eed9 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -30,7 +30,6 @@ class ServerConfig(Config): self.pid_file = self.abspath(args.pid_file) self.webclient = True self.manhole = args.manhole - self.no_tls = args.no_tls self.soft_file_limit = args.soft_file_limit if not args.content_addr: @@ -76,8 +75,6 @@ class ServerConfig(Config): server_group.add_argument("--content-addr", default=None, help="The host and scheme to use for the " "content repository") - server_group.add_argument("--no-tls", action='store_true', - help="Don't bind to the https port.") server_group.add_argument("--soft-file-limit", type=int, default=0, help="Set the soft limit on the number of " "file descriptors synapse can use. " diff --git a/synapse/config/tls.py b/synapse/config/tls.py index 384b29e7ba..034f9a7bf0 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -28,9 +28,16 @@ class TlsConfig(Config): self.tls_certificate = self.read_tls_certificate( args.tls_certificate_path ) - self.tls_private_key = self.read_tls_private_key( - args.tls_private_key_path - ) + + self.no_tls = args.no_tls + + if self.no_tls: + self.tls_private_key = None + else: + self.tls_private_key = self.read_tls_private_key( + args.tls_private_key_path + ) + self.tls_dh_params_path = self.check_file( args.tls_dh_params_path, "tls_dh_params" ) @@ -45,6 +52,8 @@ class TlsConfig(Config): help="PEM encoded private key for TLS") tls_group.add_argument("--tls-dh-params-path", help="PEM dh parameters for ephemeral keys") + tls_group.add_argument("--no-tls", action='store_true', + help="Don't bind to the https port.") def read_tls_certificate(self, cert_path): cert_pem = self.read_file(cert_path, "tls_certificate") diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py index 24d4abf3e9..2f8618a0df 100644 --- a/synapse/crypto/context_factory.py +++ b/synapse/crypto/context_factory.py @@ -38,7 +38,10 @@ class ServerContextFactory(ssl.ContextFactory): logger.exception("Failed to enable eliptic curve for TLS") context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3) context.use_certificate(config.tls_certificate) - context.use_privatekey(config.tls_private_key) + + if not config.no_tls: + context.use_privatekey(config.tls_private_key) + context.load_tmp_dh(config.tls_dh_params_path) context.set_cipher_list("!ADH:HIGH+kEDH:!AECDH:HIGH+kEECDH") diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 828aced44a..f4db7b8a05 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -50,18 +50,27 @@ class Keyring(object): ) try: verify_key = yield self.get_server_verify_key(server_name, key_ids) - except IOError: + except IOError as e: + logger.warn( + "Got IOError when downloading keys for %s: %s %s", + server_name, type(e).__name__, str(e.message), + ) raise SynapseError( 502, "Error downloading keys for %s" % (server_name,), Codes.UNAUTHORIZED, ) - except: + except Exception as e: + logger.warn( + "Got Exception when downloading keys for %s: %s %s", + server_name, type(e).__name__, str(e.message), + ) raise SynapseError( 401, "No key for %s with id %s" % (server_name, key_ids), Codes.UNAUTHORIZED, ) + try: verify_signed_json(json_object, server_name, verify_key) except: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ca89a0787c..f131941f45 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,14 +19,18 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Edu -from synapse.api.errors import CodeMessageException, SynapseError +from synapse.api.errors import ( + CodeMessageException, HttpResponseException, SynapseError, +) from synapse.util.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination +import itertools import logging +import random logger = logging.getLogger(__name__) @@ -440,21 +444,112 @@ class FederationClient(FederationBase): defer.returnValue(ret) @defer.inlineCallbacks - def get_missing_events(self, destination, room_id, earliest_events, + def get_missing_events(self, destination, room_id, earliest_events_ids, latest_events, limit, min_depth): - content = yield self.transport_layer.get_missing_events( - destination, room_id, earliest_events, latest_events, limit, - min_depth, - ) + """Tries to fetch events we are missing. This is called when we receive + an event without having received all of its ancestors. - events = [ - self.event_from_pdu_json(e) - for e in content.get("events", []) - ] + Args: + destination (str) + room_id (str) + earliest_events_ids (list): List of event ids. Effectively the + events we expected to receive, but haven't. `get_missing_events` + should only return events that didn't happen before these. + latest_events (list): List of events we have received that we don't + have all previous events for. + limit (int): Maximum number of events to return. + min_depth (int): Minimum depth of events tor return. + """ + try: + content = yield self.transport_layer.get_missing_events( + destination=destination, + room_id=room_id, + earliest_events=earliest_events_ids, + latest_events=[e.event_id for e in latest_events], + limit=limit, + min_depth=min_depth, + ) + + events = [ + self.event_from_pdu_json(e) + for e in content.get("events", []) + ] + + signed_events = yield self._check_sigs_and_hash_and_fetch( + destination, events, outlier=True + ) + + have_gotten_all_from_destination = True + except HttpResponseException as e: + if not e.code == 400: + raise - signed_events = yield self._check_sigs_and_hash_and_fetch( - destination, events, outlier=True - ) + # We are probably hitting an old server that doesn't support + # get_missing_events + signed_events = [] + have_gotten_all_from_destination = False + + if len(signed_events) >= limit: + defer.returnValue(signed_events) + + servers = yield self.store.get_joined_hosts_for_room(room_id) + + servers = set(servers) + servers.discard(self.server_name) + + failed_to_fetch = set() + + while len(signed_events) < limit: + # Are we missing any? + + seen_events = set(earliest_events_ids) + seen_events.update(e.event_id for e in signed_events) + + missing_events = {} + for e in itertools.chain(latest_events, signed_events): + if e.depth > min_depth: + missing_events.update({ + e_id: e.depth for e_id, _ in e.prev_events + if e_id not in seen_events + and e_id not in failed_to_fetch + }) + + if not missing_events: + break + + have_seen = yield self.store.have_events(missing_events) + + for k in have_seen: + missing_events.pop(k, None) + + if not missing_events: + break + + # Okay, we haven't gotten everything yet. Lets get them. + ordered_missing = sorted(missing_events.items(), key=lambda x: x[0]) + + if have_gotten_all_from_destination: + servers.discard(destination) + + def random_server_list(): + srvs = list(servers) + random.shuffle(srvs) + return srvs + + deferreds = [ + self.get_pdu( + destinations=random_server_list(), + event_id=e_id, + ) + for e_id, depth in ordered_missing[:limit - len(signed_events)] + ] + + res = yield defer.DeferredList(deferreds, consumeErrors=True) + for (result, val), (e_id, _) in zip(res, ordered_missing): + if result: + signed_events.append(val) + else: + failed_to_fetch.add(e_id) defer.returnValue(signed_events) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 4264d857be..9c7dcdba96 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -413,12 +413,16 @@ class FederationServer(FederationBase): missing_events = yield self.get_missing_events( origin, pdu.room_id, - earliest_events=list(latest), - latest_events=[pdu.event_id], + earliest_events_ids=list(latest), + latest_events=[pdu], limit=10, min_depth=min_depth, ) + # We want to sort these by depth so we process them and + # tell clients about them in order. + missing_events.sort(key=lambda x: x.depth) + for e in missing_events: yield self._handle_new_pdu( origin, diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 8d5f5c8499..d3297b7292 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -23,6 +23,7 @@ from synapse.events.utils import serialize_event from ._base import BaseHandler import logging +import random logger = logging.getLogger(__name__) @@ -72,6 +73,14 @@ class EventStreamHandler(BaseHandler): rm_handler = self.hs.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(auth_user) + if timeout: + # If they've set a timeout set a minimum limit. + timeout = max(timeout, 500) + + # Add some randomness to this value to try and mitigate against + # thundering herds on restart. + timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) + with PreserveLoggingContext(): events, tokens = yield self.notifier.get_events_for( auth_user, room_ids, pagin_config, timeout diff --git a/synapse/http/server.py b/synapse/http/server.py index 74a101a5d7..767c3ef79b 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -124,27 +124,29 @@ class JsonResource(HttpServer, resource.Resource): # and path regex match for path_entry in self.path_regexs.get(request.method, []): m = path_entry.pattern.match(request.path) - if m: - # We found a match! Trigger callback and then return the - # returned response. We pass both the request and any - # matched groups from the regex to the callback. - - args = [ - urllib.unquote(u).decode("UTF-8") for u in m.groups() - ] - - logger.info( - "Received request: %s %s", - request.method, request.path - ) - - code, response = yield path_entry.callback( - request, - *args - ) - - self._send_response(request, code, response) - return + if not m: + continue + + # We found a match! Trigger callback and then return the + # returned response. We pass both the request and any + # matched groups from the regex to the callback. + + args = [ + urllib.unquote(u).decode("UTF-8") for u in m.groups() + ] + + logger.info( + "Received request: %s %s", + request.method, request.path + ) + + code, response = yield path_entry.callback( + request, + *args + ) + + self._send_response(request, code, response) + return # Huh. No one wanted to handle that? Fiiiiiine. Send 400. raise UnrecognizedRequestError() diff --git a/synapse/notifier.py b/synapse/notifier.py index 09d23e79b8..df13e8ddb6 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -63,7 +63,7 @@ class _NotificationListener(object): pass for room in self.rooms: - lst = notifier.rooms_to_listeners.get(room, set()) + lst = notifier.room_to_listeners.get(room, set()) lst.discard(self) notifier.user_to_listeners.get(self.user, set()).discard(self) @@ -83,7 +83,7 @@ class Notifier(object): def __init__(self, hs): self.hs = hs - self.rooms_to_listeners = {} + self.room_to_listeners = {} self.user_to_listeners = {} self.appservice_to_listeners = {} @@ -116,17 +116,17 @@ class Notifier(object): room_source = self.event_sources.sources["room"] - listeners = self.rooms_to_listeners.get(room_id, set()).copy() + listeners = self.room_to_listeners.get(room_id, set()).copy() for user in extra_users: listeners |= self.user_to_listeners.get(user, set()).copy() for appservice in self.appservice_to_listeners: # TODO (kegan): Redundant appservice listener checks? - # App services will already be in the rooms_to_listeners set, but + # App services will already be in the room_to_listeners set, but # that isn't enough. They need to be checked here in order to # receive *invites* for users they are interested in. Does this - # make the rooms_to_listeners check somewhat obselete? + # make the room_to_listeners check somewhat obselete? if appservice.is_interested(event): listeners |= self.appservice_to_listeners.get( appservice, set() @@ -184,7 +184,7 @@ class Notifier(object): listeners |= self.user_to_listeners.get(user, set()).copy() for room in rooms: - listeners |= self.rooms_to_listeners.get(room, set()).copy() + listeners |= self.room_to_listeners.get(room, set()).copy() @defer.inlineCallbacks def notify(listener): @@ -337,7 +337,7 @@ class Notifier(object): @log_function def _register_with_keys(self, listener): for room in listener.rooms: - s = self.rooms_to_listeners.setdefault(room, set()) + s = self.room_to_listeners.setdefault(room, set()) s.add(listener) self.user_to_listeners.setdefault(listener.user, set()).add(listener) @@ -380,5 +380,5 @@ class Notifier(object): def _user_joined_room(self, user, room_id): new_listeners = self.user_to_listeners.get(user, set()) - listeners = self.rooms_to_listeners.setdefault(room_id, set()) + listeners = self.room_to_listeners.setdefault(room_id, set()) listeners |= new_listeners diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py index a45c673d32..f115f50e50 100644 --- a/synapse/util/lrucache.py +++ b/synapse/util/lrucache.py @@ -88,11 +88,15 @@ class LruCache(object): else: return default + def cache_len(): + return len(cache) + self.sentinel = object() self.get = cache_get self.set = cache_set self.setdefault = cache_set_default self.pop = cache_pop + self.len = cache_len def __getitem__(self, key): result = self.get(key, self.sentinel) @@ -108,3 +112,6 @@ class LruCache(object): result = self.pop(key, self.sentinel) if result is self.sentinel: raise KeyError() + + def __len__(self): + return self.len() |