diff --git a/synapse/config/server.py b/synapse/config/server.py
index 4e4892d40b..b042d4eed9 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -30,7 +30,6 @@ class ServerConfig(Config):
self.pid_file = self.abspath(args.pid_file)
self.webclient = True
self.manhole = args.manhole
- self.no_tls = args.no_tls
self.soft_file_limit = args.soft_file_limit
if not args.content_addr:
@@ -76,8 +75,6 @@ class ServerConfig(Config):
server_group.add_argument("--content-addr", default=None,
help="The host and scheme to use for the "
"content repository")
- server_group.add_argument("--no-tls", action='store_true',
- help="Don't bind to the https port.")
server_group.add_argument("--soft-file-limit", type=int, default=0,
help="Set the soft limit on the number of "
"file descriptors synapse can use. "
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 384b29e7ba..034f9a7bf0 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -28,9 +28,16 @@ class TlsConfig(Config):
self.tls_certificate = self.read_tls_certificate(
args.tls_certificate_path
)
- self.tls_private_key = self.read_tls_private_key(
- args.tls_private_key_path
- )
+
+ self.no_tls = args.no_tls
+
+ if self.no_tls:
+ self.tls_private_key = None
+ else:
+ self.tls_private_key = self.read_tls_private_key(
+ args.tls_private_key_path
+ )
+
self.tls_dh_params_path = self.check_file(
args.tls_dh_params_path, "tls_dh_params"
)
@@ -45,6 +52,8 @@ class TlsConfig(Config):
help="PEM encoded private key for TLS")
tls_group.add_argument("--tls-dh-params-path",
help="PEM dh parameters for ephemeral keys")
+ tls_group.add_argument("--no-tls", action='store_true',
+ help="Don't bind to the https port.")
def read_tls_certificate(self, cert_path):
cert_pem = self.read_file(cert_path, "tls_certificate")
diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py
index 24d4abf3e9..2f8618a0df 100644
--- a/synapse/crypto/context_factory.py
+++ b/synapse/crypto/context_factory.py
@@ -38,7 +38,10 @@ class ServerContextFactory(ssl.ContextFactory):
logger.exception("Failed to enable eliptic curve for TLS")
context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
context.use_certificate(config.tls_certificate)
- context.use_privatekey(config.tls_private_key)
+
+ if not config.no_tls:
+ context.use_privatekey(config.tls_private_key)
+
context.load_tmp_dh(config.tls_dh_params_path)
context.set_cipher_list("!ADH:HIGH+kEDH:!AECDH:HIGH+kEECDH")
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 828aced44a..f4db7b8a05 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -50,18 +50,27 @@ class Keyring(object):
)
try:
verify_key = yield self.get_server_verify_key(server_name, key_ids)
- except IOError:
+ except IOError as e:
+ logger.warn(
+ "Got IOError when downloading keys for %s: %s %s",
+ server_name, type(e).__name__, str(e.message),
+ )
raise SynapseError(
502,
"Error downloading keys for %s" % (server_name,),
Codes.UNAUTHORIZED,
)
- except:
+ except Exception as e:
+ logger.warn(
+ "Got Exception when downloading keys for %s: %s %s",
+ server_name, type(e).__name__, str(e.message),
+ )
raise SynapseError(
401,
"No key for %s with id %s" % (server_name, key_ids),
Codes.UNAUTHORIZED,
)
+
try:
verify_signed_json(json_object, server_name, verify_key)
except:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index ca89a0787c..f131941f45 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -19,14 +19,18 @@ from twisted.internet import defer
from .federation_base import FederationBase
from .units import Edu
-from synapse.api.errors import CodeMessageException, SynapseError
+from synapse.api.errors import (
+ CodeMessageException, HttpResponseException, SynapseError,
+)
from synapse.util.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
+import itertools
import logging
+import random
logger = logging.getLogger(__name__)
@@ -440,21 +444,112 @@ class FederationClient(FederationBase):
defer.returnValue(ret)
@defer.inlineCallbacks
- def get_missing_events(self, destination, room_id, earliest_events,
+ def get_missing_events(self, destination, room_id, earliest_events_ids,
latest_events, limit, min_depth):
- content = yield self.transport_layer.get_missing_events(
- destination, room_id, earliest_events, latest_events, limit,
- min_depth,
- )
+ """Tries to fetch events we are missing. This is called when we receive
+ an event without having received all of its ancestors.
- events = [
- self.event_from_pdu_json(e)
- for e in content.get("events", [])
- ]
+ Args:
+ destination (str)
+ room_id (str)
+ earliest_events_ids (list): List of event ids. Effectively the
+ events we expected to receive, but haven't. `get_missing_events`
+ should only return events that didn't happen before these.
+ latest_events (list): List of events we have received that we don't
+ have all previous events for.
+ limit (int): Maximum number of events to return.
+ min_depth (int): Minimum depth of events tor return.
+ """
+ try:
+ content = yield self.transport_layer.get_missing_events(
+ destination=destination,
+ room_id=room_id,
+ earliest_events=earliest_events_ids,
+ latest_events=[e.event_id for e in latest_events],
+ limit=limit,
+ min_depth=min_depth,
+ )
+
+ events = [
+ self.event_from_pdu_json(e)
+ for e in content.get("events", [])
+ ]
+
+ signed_events = yield self._check_sigs_and_hash_and_fetch(
+ destination, events, outlier=True
+ )
+
+ have_gotten_all_from_destination = True
+ except HttpResponseException as e:
+ if not e.code == 400:
+ raise
- signed_events = yield self._check_sigs_and_hash_and_fetch(
- destination, events, outlier=True
- )
+ # We are probably hitting an old server that doesn't support
+ # get_missing_events
+ signed_events = []
+ have_gotten_all_from_destination = False
+
+ if len(signed_events) >= limit:
+ defer.returnValue(signed_events)
+
+ servers = yield self.store.get_joined_hosts_for_room(room_id)
+
+ servers = set(servers)
+ servers.discard(self.server_name)
+
+ failed_to_fetch = set()
+
+ while len(signed_events) < limit:
+ # Are we missing any?
+
+ seen_events = set(earliest_events_ids)
+ seen_events.update(e.event_id for e in signed_events)
+
+ missing_events = {}
+ for e in itertools.chain(latest_events, signed_events):
+ if e.depth > min_depth:
+ missing_events.update({
+ e_id: e.depth for e_id, _ in e.prev_events
+ if e_id not in seen_events
+ and e_id not in failed_to_fetch
+ })
+
+ if not missing_events:
+ break
+
+ have_seen = yield self.store.have_events(missing_events)
+
+ for k in have_seen:
+ missing_events.pop(k, None)
+
+ if not missing_events:
+ break
+
+ # Okay, we haven't gotten everything yet. Lets get them.
+ ordered_missing = sorted(missing_events.items(), key=lambda x: x[0])
+
+ if have_gotten_all_from_destination:
+ servers.discard(destination)
+
+ def random_server_list():
+ srvs = list(servers)
+ random.shuffle(srvs)
+ return srvs
+
+ deferreds = [
+ self.get_pdu(
+ destinations=random_server_list(),
+ event_id=e_id,
+ )
+ for e_id, depth in ordered_missing[:limit - len(signed_events)]
+ ]
+
+ res = yield defer.DeferredList(deferreds, consumeErrors=True)
+ for (result, val), (e_id, _) in zip(res, ordered_missing):
+ if result:
+ signed_events.append(val)
+ else:
+ failed_to_fetch.add(e_id)
defer.returnValue(signed_events)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 4264d857be..9c7dcdba96 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -413,12 +413,16 @@ class FederationServer(FederationBase):
missing_events = yield self.get_missing_events(
origin,
pdu.room_id,
- earliest_events=list(latest),
- latest_events=[pdu.event_id],
+ earliest_events_ids=list(latest),
+ latest_events=[pdu],
limit=10,
min_depth=min_depth,
)
+ # We want to sort these by depth so we process them and
+ # tell clients about them in order.
+ missing_events.sort(key=lambda x: x.depth)
+
for e in missing_events:
yield self._handle_new_pdu(
origin,
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 8d5f5c8499..d3297b7292 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -23,6 +23,7 @@ from synapse.events.utils import serialize_event
from ._base import BaseHandler
import logging
+import random
logger = logging.getLogger(__name__)
@@ -72,6 +73,14 @@ class EventStreamHandler(BaseHandler):
rm_handler = self.hs.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(auth_user)
+ if timeout:
+ # If they've set a timeout set a minimum limit.
+ timeout = max(timeout, 500)
+
+ # Add some randomness to this value to try and mitigate against
+ # thundering herds on restart.
+ timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
+
with PreserveLoggingContext():
events, tokens = yield self.notifier.get_events_for(
auth_user, room_ids, pagin_config, timeout
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 74a101a5d7..767c3ef79b 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -124,27 +124,29 @@ class JsonResource(HttpServer, resource.Resource):
# and path regex match
for path_entry in self.path_regexs.get(request.method, []):
m = path_entry.pattern.match(request.path)
- if m:
- # We found a match! Trigger callback and then return the
- # returned response. We pass both the request and any
- # matched groups from the regex to the callback.
-
- args = [
- urllib.unquote(u).decode("UTF-8") for u in m.groups()
- ]
-
- logger.info(
- "Received request: %s %s",
- request.method, request.path
- )
-
- code, response = yield path_entry.callback(
- request,
- *args
- )
-
- self._send_response(request, code, response)
- return
+ if not m:
+ continue
+
+ # We found a match! Trigger callback and then return the
+ # returned response. We pass both the request and any
+ # matched groups from the regex to the callback.
+
+ args = [
+ urllib.unquote(u).decode("UTF-8") for u in m.groups()
+ ]
+
+ logger.info(
+ "Received request: %s %s",
+ request.method, request.path
+ )
+
+ code, response = yield path_entry.callback(
+ request,
+ *args
+ )
+
+ self._send_response(request, code, response)
+ return
# Huh. No one wanted to handle that? Fiiiiiine. Send 400.
raise UnrecognizedRequestError()
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 09d23e79b8..df13e8ddb6 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -63,7 +63,7 @@ class _NotificationListener(object):
pass
for room in self.rooms:
- lst = notifier.rooms_to_listeners.get(room, set())
+ lst = notifier.room_to_listeners.get(room, set())
lst.discard(self)
notifier.user_to_listeners.get(self.user, set()).discard(self)
@@ -83,7 +83,7 @@ class Notifier(object):
def __init__(self, hs):
self.hs = hs
- self.rooms_to_listeners = {}
+ self.room_to_listeners = {}
self.user_to_listeners = {}
self.appservice_to_listeners = {}
@@ -116,17 +116,17 @@ class Notifier(object):
room_source = self.event_sources.sources["room"]
- listeners = self.rooms_to_listeners.get(room_id, set()).copy()
+ listeners = self.room_to_listeners.get(room_id, set()).copy()
for user in extra_users:
listeners |= self.user_to_listeners.get(user, set()).copy()
for appservice in self.appservice_to_listeners:
# TODO (kegan): Redundant appservice listener checks?
- # App services will already be in the rooms_to_listeners set, but
+ # App services will already be in the room_to_listeners set, but
# that isn't enough. They need to be checked here in order to
# receive *invites* for users they are interested in. Does this
- # make the rooms_to_listeners check somewhat obselete?
+ # make the room_to_listeners check somewhat obselete?
if appservice.is_interested(event):
listeners |= self.appservice_to_listeners.get(
appservice, set()
@@ -184,7 +184,7 @@ class Notifier(object):
listeners |= self.user_to_listeners.get(user, set()).copy()
for room in rooms:
- listeners |= self.rooms_to_listeners.get(room, set()).copy()
+ listeners |= self.room_to_listeners.get(room, set()).copy()
@defer.inlineCallbacks
def notify(listener):
@@ -337,7 +337,7 @@ class Notifier(object):
@log_function
def _register_with_keys(self, listener):
for room in listener.rooms:
- s = self.rooms_to_listeners.setdefault(room, set())
+ s = self.room_to_listeners.setdefault(room, set())
s.add(listener)
self.user_to_listeners.setdefault(listener.user, set()).add(listener)
@@ -380,5 +380,5 @@ class Notifier(object):
def _user_joined_room(self, user, room_id):
new_listeners = self.user_to_listeners.get(user, set())
- listeners = self.rooms_to_listeners.setdefault(room_id, set())
+ listeners = self.room_to_listeners.setdefault(room_id, set())
listeners |= new_listeners
diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py
index a45c673d32..f115f50e50 100644
--- a/synapse/util/lrucache.py
+++ b/synapse/util/lrucache.py
@@ -88,11 +88,15 @@ class LruCache(object):
else:
return default
+ def cache_len():
+ return len(cache)
+
self.sentinel = object()
self.get = cache_get
self.set = cache_set
self.setdefault = cache_set_default
self.pop = cache_pop
+ self.len = cache_len
def __getitem__(self, key):
result = self.get(key, self.sentinel)
@@ -108,3 +112,6 @@ class LruCache(object):
result = self.pop(key, self.sentinel)
if result is self.sentinel:
raise KeyError()
+
+ def __len__(self):
+ return self.len()
|