summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/federation/federation_base.py3
-rw-r--r--synapse/federation/federation_client.py2
-rw-r--r--synapse/federation/federation_server.py88
-rw-r--r--synapse/federation/replication.py2
-rw-r--r--synapse/handlers/auth.py2
-rw-r--r--synapse/handlers/federation.py30
-rw-r--r--synapse/http/client.py28
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py11
-rw-r--r--synapse/types.py5
10 files changed, 112 insertions, 61 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index dc211e9637..faaa86d972 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.16.0"
+__version__ = "0.16.1-rc1"
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index a0b7cb7963..da2f5e8cfd 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -31,6 +31,9 @@ logger = logging.getLogger(__name__)
 
 
 class FederationBase(object):
+    def __init__(self, hs):
+        pass
+
     @defer.inlineCallbacks
     def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
                                        include_none=False):
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index d835c1b038..b06387051c 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -52,6 +52,8 @@ sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
 
 
 class FederationClient(FederationBase):
+    def __init__(self, hs):
+        super(FederationClient, self).__init__(hs)
 
     def start_get_pdu_cache(self):
         self._get_pdu_cache = ExpiringCache(
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9f2a64dede..fe92457ba1 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 from .federation_base import FederationBase
 from .units import Transaction, Edu
 
+from synapse.util.async import Linearizer
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
 import synapse.metrics
@@ -44,6 +45,11 @@ received_queries_counter = metrics.register_counter("received_queries", labels=[
 
 
 class FederationServer(FederationBase):
+    def __init__(self, hs):
+        super(FederationServer, self).__init__(hs)
+
+        self._room_pdu_linearizer = Linearizer()
+
     def set_handler(self, handler):
         """Sets the handler that the replication layer will use to communicate
         receipt of new PDUs from other home servers. The required methods are
@@ -491,43 +497,51 @@ class FederationServer(FederationBase):
                 pdu.internal_metadata.outlier = True
             elif min_depth and pdu.depth > min_depth:
                 if get_missing and prevs - seen:
-                    latest = yield self.store.get_latest_event_ids_in_room(
-                        pdu.room_id
-                    )
-
-                    # We add the prev events that we have seen to the latest
-                    # list to ensure the remote server doesn't give them to us
-                    latest = set(latest)
-                    latest |= seen
-
-                    logger.info(
-                        "Missing %d events for room %r: %r...",
-                        len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
-                    )
-
-                    missing_events = yield self.get_missing_events(
-                        origin,
-                        pdu.room_id,
-                        earliest_events_ids=list(latest),
-                        latest_events=[pdu],
-                        limit=10,
-                        min_depth=min_depth,
-                    )
-
-                    # We want to sort these by depth so we process them and
-                    # tell clients about them in order.
-                    missing_events.sort(key=lambda x: x.depth)
-
-                    for e in missing_events:
-                        yield self._handle_new_pdu(
-                            origin,
-                            e,
-                            get_missing=False
-                        )
-
-                    have_seen = yield self.store.have_events(
-                        [ev for ev, _ in pdu.prev_events]
-                    )
+                    # If we're missing stuff, ensure we only fetch stuff one
+                    # at a time.
+                    with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
+                        # We recalculate seen, since it may have changed.
+                        have_seen = yield self.store.have_events(prevs)
+                        seen = set(have_seen.keys())
+
+                        if prevs - seen:
+                            latest = yield self.store.get_latest_event_ids_in_room(
+                                pdu.room_id
+                            )
+
+                            # We add the prev events that we have seen to the latest
+                            # list to ensure the remote server doesn't give them to us
+                            latest = set(latest)
+                            latest |= seen
+
+                            logger.info(
+                                "Missing %d events for room %r: %r...",
+                                len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+                            )
+
+                            missing_events = yield self.get_missing_events(
+                                origin,
+                                pdu.room_id,
+                                earliest_events_ids=list(latest),
+                                latest_events=[pdu],
+                                limit=10,
+                                min_depth=min_depth,
+                            )
+
+                            # We want to sort these by depth so we process them and
+                            # tell clients about them in order.
+                            missing_events.sort(key=lambda x: x.depth)
+
+                            for e in missing_events:
+                                yield self._handle_new_pdu(
+                                    origin,
+                                    e,
+                                    get_missing=False
+                                )
+
+                            have_seen = yield self.store.have_events(
+                                [ev for ev, _ in pdu.prev_events]
+                            )
 
             prevs = {e_id for e_id, _ in pdu.prev_events}
             seen = set(have_seen.keys())
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 3e062a5eab..ea66a5dcbc 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -72,5 +72,7 @@ class ReplicationLayer(FederationClient, FederationServer):
 
         self.hs = hs
 
+        super(ReplicationLayer, self).__init__(hs)
+
     def __str__(self):
         return "<ReplicationLayer(%s)>" % self.server_name
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 200793b5ed..b38f81e999 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -626,6 +626,6 @@ class AuthHandler(BaseHandler):
             Whether self.hash(password) == stored_hash (bool).
         """
         if stored_hash:
-            return bcrypt.hashpw(password, stored_hash) == stored_hash
+            return bcrypt.hashpw(password, stored_hash.encode('utf-8')) == stored_hash
         else:
             return False
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ff83c608e7..c2df43e2f6 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -345,19 +345,21 @@ class FederationHandler(BaseHandler):
         )
 
         missing_auth = required_auth - set(auth_events)
-        results = yield defer.gatherResults(
-            [
-                self.replication_layer.get_pdu(
-                    [dest],
-                    event_id,
-                    outlier=True,
-                    timeout=10000,
-                )
-                for event_id in missing_auth
-            ],
-            consumeErrors=True
-        ).addErrback(unwrapFirstError)
-        auth_events.update({a.event_id: a for a in results})
+        if missing_auth:
+            logger.info("Missing auth for backfill: %r", missing_auth)
+            results = yield defer.gatherResults(
+                [
+                    self.replication_layer.get_pdu(
+                        [dest],
+                        event_id,
+                        outlier=True,
+                        timeout=10000,
+                    )
+                    for event_id in missing_auth
+                ],
+                consumeErrors=True
+            ).addErrback(unwrapFirstError)
+            auth_events.update({a.event_id: a for a in results})
 
         ev_infos = []
         for a in auth_events.values():
@@ -399,7 +401,7 @@ class FederationHandler(BaseHandler):
             # previous to work out the state.
             # TODO: We can probably do something more clever here.
             yield self._handle_new_event(
-                dest, event
+                dest, event, backfilled=True,
             )
 
         defer.returnValue(events)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index c7fa692435..3ec9bc7faf 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -24,12 +24,13 @@ from synapse.http.endpoint import SpiderEndpoint
 
 from canonicaljson import encode_canonical_json
 
-from twisted.internet import defer, reactor, ssl, protocol
+from twisted.internet import defer, reactor, ssl, protocol, task
 from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint
 from twisted.web.client import (
     BrowserLikeRedirectAgent, ContentDecoderAgent, GzipDecoder, Agent,
-    readBody, FileBodyProducer, PartialDownloadError,
+    readBody, PartialDownloadError,
 )
+from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer
 from twisted.web.http import PotentialDataLoss
 from twisted.web.http_headers import Headers
 from twisted.web._newclient import ResponseDone
@@ -468,3 +469,26 @@ class InsecureInterceptableContextFactory(ssl.ContextFactory):
 
     def creatorForNetloc(self, hostname, port):
         return self
+
+
+class FileBodyProducer(TwistedFileBodyProducer):
+    """Workaround for https://twistedmatrix.com/trac/ticket/8473
+
+    We override the pauseProducing and resumeProducing methods in twisted's
+    FileBodyProducer so that they do not raise exceptions if the task has
+    already completed.
+    """
+
+    def pauseProducing(self):
+        try:
+            super(FileBodyProducer, self).pauseProducing()
+        except task.TaskDone:
+            # task has already completed
+            pass
+
+    def resumeProducing(self):
+        try:
+            super(FileBodyProducer, self).resumeProducing()
+        except task.NotPaused:
+            # task was not paused (probably because it had already completed)
+            pass
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 37dd1de899..74c64f1371 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -252,7 +252,8 @@ class PreviewUrlResource(Resource):
 
         og = {}
         for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"):
-            og[tag.attrib['property']] = tag.attrib['content']
+            if 'content' in tag.attrib:
+                og[tag.attrib['property']] = tag.attrib['content']
 
         # TODO: grab article: meta tags too, e.g.:
 
@@ -279,7 +280,7 @@ class PreviewUrlResource(Resource):
                 # TODO: consider inlined CSS styles as well as width & height attribs
                 images = tree.xpath("//img[@src][number(@width)>10][number(@height)>10]")
                 images = sorted(images, key=lambda i: (
-                    -1 * int(i.attrib['width']) * int(i.attrib['height'])
+                    -1 * float(i.attrib['width']) * float(i.attrib['height'])
                 ))
                 if not images:
                     images = tree.xpath("//img[@src]")
@@ -287,9 +288,9 @@ class PreviewUrlResource(Resource):
                     og['og:image'] = images[0].attrib['src']
 
         # pre-cache the image for posterity
-        # FIXME: it might be cleaner to use the same flow as the main /preview_url request
-        # itself and benefit from the same caching etc.  But for now we just rely on the
-        # caching on the master request to speed things up.
+        # FIXME: it might be cleaner to use the same flow as the main /preview_url
+        # request itself and benefit from the same caching etc.  But for now we
+        # just rely on the caching on the master request to speed things up.
         if 'og:image' in og and og['og:image']:
             image_info = yield self._download_url(
                 self._rebase_url(og['og:image'], media_info['uri']), requester.user
diff --git a/synapse/types.py b/synapse/types.py
index 7b6ae44bdd..f639651a73 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -22,7 +22,10 @@ Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"])
 
 
 def get_domain_from_id(string):
-    return string.split(":", 1)[1]
+    try:
+        return string.split(":", 1)[1]
+    except IndexError:
+        raise SynapseError(400, "Invalid ID: %r", string)
 
 
 class DomainSpecificString(