summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-04-30 16:54:55 +0100
committerMark Haines <mark.haines@matrix.org>2015-04-30 16:54:55 +0100
commit2d4d2bbae4f35e3128fa492e45cfc5b0750524bc (patch)
tree8e3fa03a60443dc2a1af80d5fb13aa0e28ffddca
parentWrite a default log_config when generating config (diff)
parentAdd get_rooms_for_user cache (diff)
downloadsynapse-2d4d2bbae4f35e3128fa492e45cfc5b0750524bc.tar.xz
Merge branch 'develop' into markjh/config_cleanup
Conflicts:
	synapse/config/captcha.py
-rwxr-xr-xsynapse/app/homeserver.py39
-rw-r--r--synapse/config/captcha.py1
-rw-r--r--synapse/handlers/identity.py19
-rw-r--r--synapse/handlers/message.py3
-rw-r--r--synapse/handlers/presence.py2
-rw-r--r--synapse/http/server.py15
-rw-r--r--synapse/push/pusherpool.py8
-rw-r--r--synapse/storage/appservice.py20
-rw-r--r--synapse/storage/keys.py8
-rw-r--r--synapse/storage/pusher.py54
-rw-r--r--synapse/storage/registration.py4
-rw-r--r--synapse/storage/room.py15
-rw-r--r--synapse/storage/roommember.py2
-rw-r--r--synapse/storage/schema/delta/17/user_threepids.sql9
-rw-r--r--synapse/storage/state.py21
-rw-r--r--synapse/storage/util/id_generators.py44
16 files changed, 176 insertions, 88 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e6a34561c1..d8d0df7e41 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -35,6 +35,7 @@ from twisted.enterprise import adbapi
 from twisted.web.resource import Resource
 from twisted.web.static import File
 from twisted.web.server import Site
+from twisted.web.http import proxiedLogFormatter, combinedLogFormatter
 from synapse.http.server import JsonResource, RootRedirect
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.rest.media.v1.media_repository import MediaRepositoryResource
@@ -228,7 +229,11 @@ class SynapseHomeServer(HomeServer):
         if not config.no_tls and config.bind_port is not None:
             reactor.listenSSL(
                 config.bind_port,
-                Site(self.root_resource),
+                SynapseSite(
+                    "synapse.access.https",
+                    config,
+                    self.root_resource,
+                ),
                 self.tls_context_factory,
                 interface=config.bind_host
             )
@@ -237,7 +242,11 @@ class SynapseHomeServer(HomeServer):
         if config.unsecure_port is not None:
             reactor.listenTCP(
                 config.unsecure_port,
-                Site(self.root_resource),
+                SynapseSite(
+                    "synapse.access.http",
+                    config,
+                    self.root_resource,
+                ),
                 interface=config.bind_host
             )
             logger.info("Synapse now listening on port %d", config.unsecure_port)
@@ -245,7 +254,13 @@ class SynapseHomeServer(HomeServer):
         metrics_resource = self.get_resource_for_metrics()
         if metrics_resource and config.metrics_port is not None:
             reactor.listenTCP(
-                config.metrics_port, Site(metrics_resource), interface="127.0.0.1",
+                config.metrics_port,
+                SynapseSite(
+                    "synapse.access.metrics",
+                    config,
+                    metrics_resource,
+                ),
+                interface="127.0.0.1",
             )
             logger.info("Metrics now running on 127.0.0.1 port %d", config.metrics_port)
 
@@ -462,6 +477,24 @@ class SynapseService(service.Service):
         return self._port.stopListening()
 
 
+class SynapseSite(Site):
+    """
+    Subclass of a twisted http Site that does access logging with python's
+    standard logging
+    """
+    def __init__(self, logger_name, config, resource, *args, **kwargs):
+        Site.__init__(self, resource, *args, **kwargs)
+        if config.captcha_ip_origin_is_x_forwarded:
+            self._log_formatter = proxiedLogFormatter
+        else:
+            self._log_formatter = combinedLogFormatter
+        self.access_logger = logging.getLogger(logger_name)
+
+    def log(self, request):
+        line = self._log_formatter(self._logDateTime, request)
+        self.access_logger.info(line)
+
+
 def run(hs):
 
     def in_thread():
diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py
index ba7037aeb1..ee5c238314 100644
--- a/synapse/config/captcha.py
+++ b/synapse/config/captcha.py
@@ -21,6 +21,7 @@ class CaptchaConfig(Config):
         self.recaptcha_private_key = config["recaptcha_private_key"]
         self.recaptcha_public_key = config["recaptcha_public_key"]
         self.enable_registration_captcha = config["enable_registration_captcha"]
+        # XXX: This is used for more than just captcha
         self.captcha_ip_origin_is_x_forwarded = (
             config["captcha_ip_origin_is_x_forwarded"]
         )
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index cbb3184ca5..6200e10775 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -87,14 +87,29 @@ class IdentityHandler(BaseHandler):
         logger.debug("binding threepid %r to %s", creds, mxid)
         http_client = SimpleHttpClient(self.hs)
         data = None
+
+        if 'id_server' in creds:
+            id_server = creds['id_server']
+        elif 'idServer' in creds:
+            id_server = creds['idServer']
+        else:
+            raise SynapseError(400, "No id_server in creds")
+
+        if 'client_secret' in creds:
+            client_secret = creds['client_secret']
+        elif 'clientSecret' in creds:
+            client_secret = creds['clientSecret']
+        else:
+            raise SynapseError(400, "No client_secret in creds")
+
         try:
             data = yield http_client.post_urlencoded_get_json(
                 "https://%s%s" % (
-                    creds['id_server'], "/_matrix/identity/api/v1/3pid/bind"
+                    id_server, "/_matrix/identity/api/v1/3pid/bind"
                 ),
                 {
                     'sid': creds['sid'],
-                    'client_secret': creds['client_secret'],
+                    'client_secret': client_secret,
                     'mxid': mxid,
                 }
             )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 9667bb8674..22e19af17f 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -267,8 +267,7 @@ class MessageHandler(BaseHandler):
             user, pagination_config.get_source_config("presence"), None
         )
 
-        public_rooms = yield self.store.get_rooms(is_public=True)
-        public_room_ids = [r["room_id"] for r in public_rooms]
+        public_room_ids = yield self.store.get_public_room_ids()
 
         limit = pagin_config.limit
         if limit is None:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 42cd528908..9e15610401 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -881,7 +881,7 @@ class PresenceEventSource(object):
         # TODO(paul): limit
 
         for serial, user_ids in presence._remote_offline_serials:
-            if serial < from_key:
+            if serial <= from_key:
                 break
 
             if serial > max_serial:
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 05636e683b..93ecbd7589 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -24,7 +24,7 @@ from syutil.jsonutil import (
     encode_canonical_json, encode_pretty_printed_json
 )
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.web import server, resource
 from twisted.web.server import NOT_DONE_YET
 from twisted.web.util import redirectTo
@@ -179,19 +179,6 @@ class JsonResource(HttpServer, resource.Resource):
             self._PathEntry(path_pattern, callback)
         )
 
-    def start_listening(self, port):
-        """ Registers the http server with the twisted reactor.
-
-        Args:
-            port (int): The port to listen on.
-
-        """
-        reactor.listenTCP(
-            port,
-            server.Site(self),
-            interface=self.hs.config.bind_host
-        )
-
     def render(self, request):
         """ This gets called by twisted every time someone sends us a request.
         """
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 58fe6ad747..0ab2f65972 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -170,7 +170,11 @@ class PusherPool:
     def _start_pushers(self, pushers):
         logger.info("Starting %d pushers", len(pushers))
         for pusherdict in pushers:
-            p = self._create_pusher(pusherdict)
+            try:
+                p = self._create_pusher(pusherdict)
+            except PusherConfigException:
+                logger.exception("Couldn't start a pusher: caught PusherConfigException")
+                continue
             if p:
                 fullid = "%s:%s:%s" % (
                     pusherdict['app_id'],
@@ -182,6 +186,8 @@ class PusherPool:
                 self.pushers[fullid] = p
                 p.start()
 
+        logger.info("Started pushers")
+
     @defer.inlineCallbacks
     def remove_pusher(self, app_id, pushkey, user_name):
         fullid = "%s:%s:%s" % (app_id, pushkey, user_name)
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 63d1af4e86..39b7881c40 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -355,11 +355,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
         # being sent)
         last_txn_id = self._get_last_txn(txn, service.id)
 
-        result = txn.execute(
+        txn.execute(
             "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
             (service.id,)
         )
-        highest_txn_id = result.fetchone()[0]
+        highest_txn_id = txn.fetchone()[0]
         if highest_txn_id is None:
             highest_txn_id = 0
 
@@ -441,15 +441,17 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
     def _get_oldest_unsent_txn(self, txn, service):
         # Monotonically increasing txn ids, so just select the smallest
         # one in the txns table (we delete them when they are sent)
-        result = txn.execute(
-            "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?",
+        txn.execute(
+            "SELECT * FROM application_services_txns WHERE as_id=?"
+            " ORDER BY txn_id ASC LIMIT 1",
             (service.id,)
         )
-        entry = self.cursor_to_dict(result)[0]
-        if not entry or entry["txn_id"] is None:
-            # the min(txn_id) part will force a row, so entry may not be None
+        rows = self.cursor_to_dict(txn)
+        if not rows:
             return None
 
+        entry = rows[0]
+
         event_ids = json.loads(entry["event_ids"])
         events = self._get_events_txn(txn, event_ids)
 
@@ -458,11 +460,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
         )
 
     def _get_last_txn(self, txn, service_id):
-        result = txn.execute(
+        txn.execute(
             "SELECT last_txn FROM application_services_state WHERE as_id=?",
             (service_id,)
         )
-        last_txn_id = result.fetchone()
+        last_txn_id = txn.fetchone()
         if last_txn_id is None or last_txn_id[0] is None:  # no row exists
             return 0
         else:
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index cbe9339ccf..5bdf497b93 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -137,8 +137,13 @@ class KeyStore(SQLBaseStore):
             ts_valid_until_ms (int): The time when this json stops being valid.
             key_json (bytes): The encoded JSON.
         """
-        return self._simple_insert(
+        return self._simple_upsert(
             table="server_keys_json",
+            keyvalues={
+                "server_name": server_name,
+                "key_id": key_id,
+                "from_server": from_server,
+            },
             values={
                 "server_name": server_name,
                 "key_id": key_id,
@@ -147,7 +152,6 @@ class KeyStore(SQLBaseStore):
                 "ts_valid_until_ms": ts_expires_ms,
                 "key_json": buffer(key_json_bytes),
             },
-            or_replace=True,
         )
 
     def get_server_keys_json(self, server_keys):
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 2582a1da66..08ea62681b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -21,34 +21,62 @@ from synapse.api.errors import StoreError
 from syutil.jsonutil import encode_canonical_json
 
 import logging
+import simplejson as json
+import types
 
 logger = logging.getLogger(__name__)
 
 
 class PusherStore(SQLBaseStore):
+    def _decode_pushers_rows(self, rows):
+        for r in rows:
+            dataJson = r['data']
+            r['data'] = None
+            try:
+                if isinstance(dataJson, types.BufferType):
+                    dataJson = str(dataJson).decode("UTF8")
+
+                r['data'] = json.loads(dataJson)
+            except Exception as e:
+                logger.warn(
+                    "Invalid JSON in data for pusher %d: %s, %s",
+                    r['id'], dataJson, e.message,
+                )
+                pass
+
+            if isinstance(r['pushkey'], types.BufferType):
+                r['pushkey'] = str(r['pushkey']).decode("UTF8")
+
+        return rows
+
     @defer.inlineCallbacks
     def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
-        sql = (
-            "SELECT * FROM pushers "
-            "WHERE app_id = ? AND pushkey = ?"
-        )
+        def r(txn):
+            sql = (
+                "SELECT * FROM pushers"
+                " WHERE app_id = ? AND pushkey = ?"
+            )
 
-        rows = yield self._execute_and_decode(
-            "get_pushers_by_app_id_and_pushkey",
-            sql,
-            app_id, pushkey
+            txn.execute(sql, (app_id, pushkey,))
+            rows = self.cursor_to_dict(txn)
+
+            return self._decode_pushers_rows(rows)
+
+        rows = yield self.runInteraction(
+            "get_pushers_by_app_id_and_pushkey", r
         )
 
         defer.returnValue(rows)
 
     @defer.inlineCallbacks
     def get_all_pushers(self):
-        sql = (
-            "SELECT * FROM pushers"
-        )
+        def get_pushers(txn):
+            txn.execute("SELECT * FROM pushers")
+            rows = self.cursor_to_dict(txn)
 
-        rows = yield self._execute_and_decode("get_all_pushers", sql)
+            return self._decode_pushers_rows(rows)
 
+        rows = yield self.runInteraction("get_all_pushers", get_pushers)
         defer.returnValue(rows)
 
     @defer.inlineCallbacks
@@ -72,7 +100,7 @@ class PusherStore(SQLBaseStore):
                     device_display_name=device_display_name,
                     ts=pushkey_ts,
                     lang=lang,
-                    data=encode_canonical_json(data).decode("UTF-8"),
+                    data=encode_canonical_json(data),
                 ),
                 insertion_values=dict(
                     id=next_id,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index a986c4816e..026ba217d6 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -181,7 +181,7 @@ class RegistrationStore(SQLBaseStore):
     @defer.inlineCallbacks
     def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
         yield self._simple_upsert("user_threepids", {
-            "user": user_id,
+            "user_id": user_id,
             "medium": medium,
             "address": address,
         }, {
@@ -193,7 +193,7 @@ class RegistrationStore(SQLBaseStore):
     def user_get_threepids(self, user_id):
         ret = yield self._simple_select_list(
             "user_threepids", {
-                "user": user_id
+                "user_id": user_id
             },
             ['medium', 'address', 'validated_at', 'added_at'],
             'user_get_threepids'
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 48ebb33057..78572bbdd2 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -75,6 +75,16 @@ class RoomStore(SQLBaseStore):
             allow_none=True,
         )
 
+    def get_public_room_ids(self):
+        return self._simple_select_onecol(
+            table="rooms",
+            keyvalues={
+                "is_public": True,
+            },
+            retcol="room_id",
+            desc="get_public_room_ids",
+        )
+
     @defer.inlineCallbacks
     def get_rooms(self, is_public):
         """Retrieve a list of all public rooms.
@@ -186,14 +196,13 @@ class RoomStore(SQLBaseStore):
         sql = (
             "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
             "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
-            "INNER JOIN state_events as s ON e.event_id = s.event_id "
             "WHERE c.room_id = ? "
         ) % {
             "redacted": del_sql,
         }
 
-        sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
-        sql += " OR s.type = 'm.room.aliases')"
+        sql += " AND ((c.type = 'm.room.name' AND c.state_key = '')"
+        sql += " OR c.type = 'm.room.aliases')"
         args = (room_id,)
 
         results = yield self._execute_and_decode("get_current_state", sql, *args)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 8ea5756d61..831169e220 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -65,6 +65,7 @@ class RoomMemberStore(SQLBaseStore):
         )
 
         self.get_rooms_for_user.invalidate(target_user_id)
+        self.get_joined_hosts_for_room.invalidate(event.room_id)
 
     def get_room_member(self, user_id, room_id):
         """Retrieve the current state of a room member.
@@ -162,6 +163,7 @@ class RoomMemberStore(SQLBaseStore):
             RoomsForUser(**r) for r in self.cursor_to_dict(txn)
         ]
 
+    @cached()
     def get_joined_hosts_for_room(self, room_id):
         return self.runInteraction(
             "get_joined_hosts_for_room",
diff --git a/synapse/storage/schema/delta/17/user_threepids.sql b/synapse/storage/schema/delta/17/user_threepids.sql
new file mode 100644
index 0000000000..c17715ac80
--- /dev/null
+++ b/synapse/storage/schema/delta/17/user_threepids.sql
@@ -0,0 +1,9 @@
+CREATE TABLE user_threepids (
+    user_id TEXT NOT NULL,
+    medium TEXT NOT NULL,
+    address TEXT NOT NULL,
+    validated_at BIGINT NOT NULL,
+    added_at BIGINT NOT NULL,
+    CONSTRAINT user_medium_address UNIQUE (user_id, medium, address)
+);
+CREATE INDEX user_threepids_user_id ON user_threepids(user_id);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 553ba9dd1f..95bc15c0dc 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -128,25 +128,18 @@ class StateStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_current_state(self, room_id, event_type=None, state_key=""):
-        del_sql = (
-            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
-            "LIMIT 1"
-        )
-
         sql = (
-            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
-            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
-            "INNER JOIN state_events as s ON e.event_id = s.event_id "
-            "WHERE c.room_id = ? "
-        ) % {
-            "redacted": del_sql,
-        }
+            "SELECT e.*, r.event_id FROM events as e"
+            " LEFT JOIN redactions as r ON r.redacts = e.event_id"
+            " INNER JOIN current_state_events as c ON e.event_id = c.event_id"
+            " WHERE c.room_id = ? "
+        )
 
         if event_type and state_key is not None:
-            sql += " AND s.type = ? AND s.state_key = ? "
+            sql += " AND c.type = ? AND c.state_key = ? "
             args = (room_id, event_type, state_key)
         elif event_type:
-            sql += " AND s.type = ?"
+            sql += " AND c.type = ?"
             args = (room_id, event_type)
         else:
             args = (room_id, )
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 9d461d5e96..e40eb8a8c4 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -30,15 +30,13 @@ class IdGenerator(object):
 
     @defer.inlineCallbacks
     def get_next(self):
-        with self._lock:
-            if not self._next_id:
-                res = yield self.store._execute_and_decode(
-                    "IdGenerator_%s" % (self.table,),
-                    "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,)
-                )
-
-                self._next_id = (res and res[0] and res[0]["mx"]) or 1
+        if self._next_id is None:
+            yield self.store.runInteraction(
+                "IdGenerator_%s" % (self.table,),
+                self.get_next_txn,
+            )
 
+        with self._lock:
             i = self._next_id
             self._next_id += 1
             defer.returnValue(i)
@@ -86,10 +84,10 @@ class StreamIdGenerator(object):
             with stream_id_gen.get_next_txn(txn) as stream_id:
                 # ... persist event ...
         """
-        with self._lock:
-            if not self._current_max:
-                self._compute_current_max(txn)
+        if not self._current_max:
+            self._get_or_compute_current_max(txn)
 
+        with self._lock:
             self._current_max += 1
             next_id = self._current_max
 
@@ -110,22 +108,24 @@ class StreamIdGenerator(object):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
         """
+        if not self._current_max:
+            yield store.runInteraction(
+                "_compute_current_max",
+                self._get_or_compute_current_max,
+            )
+
         with self._lock:
             if self._unfinished_ids:
                 defer.returnValue(self._unfinished_ids[0] - 1)
 
-            if not self._current_max:
-                yield store.runInteraction(
-                    "_compute_current_max",
-                    self._compute_current_max,
-                )
-
             defer.returnValue(self._current_max)
 
-    def _compute_current_max(self, txn):
-        txn.execute("SELECT MAX(stream_ordering) FROM events")
-        val, = txn.fetchone()
+    def _get_or_compute_current_max(self, txn):
+        with self._lock:
+            txn.execute("SELECT MAX(stream_ordering) FROM events")
+            rows = txn.fetchall()
+            val, = rows[0]
 
-        self._current_max = int(val) if val else 1
+            self._current_max = int(val) if val else 1
 
-        return self._current_max
+            return self._current_max