summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/errors.py3
-rwxr-xr-xsynapse/app/homeserver.py36
-rwxr-xr-xsynapse/app/synctl.py4
-rw-r--r--synapse/handlers/device.py6
-rw-r--r--synapse/handlers/e2e_keys.py56
-rw-r--r--synapse/handlers/identity.py8
-rw-r--r--synapse/handlers/message.py4
-rw-r--r--synapse/handlers/register.py14
-rw-r--r--synapse/python_dependencies.py1
-rw-r--r--synapse/replication/tcp/commands.py8
-rw-r--r--synapse/state.py55
-rw-r--r--synapse/storage/events.py62
-rw-r--r--synapse/storage/group_server.py2
-rw-r--r--synapse/storage/registration.py4
-rw-r--r--synapse/storage/room.py7
-rw-r--r--synapse/storage/roommember.py6
-rw-r--r--synapse/storage/schema/delta/14/upgrade_appservice_db.py3
-rw-r--r--synapse/storage/search.py6
-rw-r--r--synapse/storage/user_directory.py4
-rw-r--r--synapse/util/frozenutils.py19
20 files changed, 204 insertions, 104 deletions
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index aa15f73f36..bee59e80dd 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -15,9 +15,10 @@
 
 """Contains exceptions and error codes."""
 
-import json
 import logging
 
+import simplejson as json
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 777e9c529a..a0e465d644 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -48,6 +48,7 @@ from synapse.server import HomeServer
 from synapse.storage import are_all_users_on_domain
 from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
 from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
+from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
@@ -402,6 +403,10 @@ def run(hs):
 
     stats = {}
 
+    # Contains the list of processes we will be monitoring
+    # currently either 0 or 1
+    stats_process = []
+
     @defer.inlineCallbacks
     def phone_stats_home():
         logger.info("Gathering stats for reporting")
@@ -431,6 +436,15 @@ def run(hs):
 
         daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
         stats["daily_sent_messages"] = daily_sent_messages
+        stats["cache_factor"] = CACHE_SIZE_FACTOR
+        stats["event_cache_size"] = hs.config.event_cache_size
+
+        if len(stats_process) > 0:
+            stats["memory_rss"] = 0
+            stats["cpu_average"] = 0
+            for process in stats_process:
+                stats["memory_rss"] += process.memory_info().rss
+                stats["cpu_average"] += int(process.cpu_percent(interval=None))
 
         logger.info("Reporting stats to matrix.org: %s" % (stats,))
         try:
@@ -441,10 +455,32 @@ def run(hs):
         except Exception as e:
             logger.warn("Error reporting stats: %s", e)
 
+    def performance_stats_init():
+        try:
+            import psutil
+            process = psutil.Process()
+            # Ensure we can fetch both, and make the initial request for cpu_percent
+            # so the next request will use this as the initial point.
+            process.memory_info().rss
+            process.cpu_percent(interval=None)
+            logger.info("report_stats can use psutil")
+            stats_process.append(process)
+        except (ImportError, AttributeError):
+            logger.warn(
+                "report_stats enabled but psutil is not installed or incorrect version."
+                " Disabling reporting of memory/cpu stats."
+                " Ensuring psutil is available will help matrix.org track performance"
+                " changes across releases."
+            )
+
     if hs.config.report_stats:
         logger.info("Scheduling stats reporting for 3 hour intervals")
         clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
 
+        # We need to defer this init for the cases that we daemonize
+        # otherwise the process ID we get is that of the non-daemon process
+        clock.call_later(0, performance_stats_init)
+
         # We wait 5 minutes to send the first set of stats as the server can
         # be quite busy the first few minutes
         clock.call_later(5 * 60, phone_stats_home)
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 0f0ddfa78a..b0e1b5e66a 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -38,7 +38,7 @@ def pid_running(pid):
     try:
         os.kill(pid, 0)
         return True
-    except OSError, err:
+    except OSError as err:
         if err.errno == errno.EPERM:
             return True
         return False
@@ -98,7 +98,7 @@ def stop(pidfile, app):
         try:
             os.kill(pid, signal.SIGTERM)
             write("stopped %s" % (app,), colour=GREEN)
-        except OSError, err:
+        except OSError as err:
             if err.errno == errno.ESRCH:
                 write("%s not running" % (app,), colour=YELLOW)
             elif err.errno == errno.EPERM:
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 40f3d24678..f7457a7082 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -155,7 +155,7 @@ class DeviceHandler(BaseHandler):
 
         try:
             yield self.store.delete_device(user_id, device_id)
-        except errors.StoreError, e:
+        except errors.StoreError as e:
             if e.code == 404:
                 # no match
                 pass
@@ -204,7 +204,7 @@ class DeviceHandler(BaseHandler):
 
         try:
             yield self.store.delete_devices(user_id, device_ids)
-        except errors.StoreError, e:
+        except errors.StoreError as e:
             if e.code == 404:
                 # no match
                 pass
@@ -243,7 +243,7 @@ class DeviceHandler(BaseHandler):
                 new_display_name=content.get("display_name")
             )
             yield self.notify_device_update(user_id, [device_id])
-        except errors.StoreError, e:
+        except errors.StoreError as e:
             if e.code == 404:
                 raise errors.NotFoundError()
             else:
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 80b359b2e7..325c0c4a9f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -134,23 +135,8 @@ class E2eKeysHandler(object):
                     if user_id in destination_query:
                         results[user_id] = keys
 
-            except CodeMessageException as e:
-                failures[destination] = {
-                    "status": e.code, "message": e.message
-                }
-            except NotRetryingDestination as e:
-                failures[destination] = {
-                    "status": 503, "message": "Not ready for retry",
-                }
-            except FederationDeniedError as e:
-                failures[destination] = {
-                    "status": 403, "message": "Federation Denied",
-                }
             except Exception as e:
-                # include ConnectionRefused and other errors
-                failures[destination] = {
-                    "status": 503, "message": e.message
-                }
+                failures[destination] = _exception_to_failure(e)
 
         yield make_deferred_yieldable(defer.gatherResults([
             preserve_fn(do_remote_query)(destination)
@@ -252,19 +238,8 @@ class E2eKeysHandler(object):
                 for user_id, keys in remote_result["one_time_keys"].items():
                     if user_id in device_keys:
                         json_result[user_id] = keys
-            except CodeMessageException as e:
-                failures[destination] = {
-                    "status": e.code, "message": e.message
-                }
-            except NotRetryingDestination as e:
-                failures[destination] = {
-                    "status": 503, "message": "Not ready for retry",
-                }
             except Exception as e:
-                # include ConnectionRefused and other errors
-                failures[destination] = {
-                    "status": 503, "message": e.message
-                }
+                failures[destination] = _exception_to_failure(e)
 
         yield make_deferred_yieldable(defer.gatherResults([
             preserve_fn(claim_client_keys)(destination)
@@ -362,6 +337,31 @@ class E2eKeysHandler(object):
         )
 
 
+def _exception_to_failure(e):
+    if isinstance(e, CodeMessageException):
+        return {
+            "status": e.code, "message": e.message,
+        }
+
+    if isinstance(e, NotRetryingDestination):
+        return {
+            "status": 503, "message": "Not ready for retry",
+        }
+
+    if isinstance(e, FederationDeniedError):
+        return {
+            "status": 403, "message": "Federation Denied",
+        }
+
+    # include ConnectionRefused and other errors
+    #
+    # Note that some Exceptions (notably twisted's ResponseFailed etc) don't
+    # give a string for e.message, which simplejson then fails to serialize.
+    return {
+        "status": 503, "message": str(e.message),
+    }
+
+
 def _one_time_keys_match(old_key_json, new_key):
     old_key = json.loads(old_key_json)
 
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 9efcdff1d6..91a0898860 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -15,6 +15,11 @@
 # limitations under the License.
 
 """Utilities for interacting with Identity Servers"""
+
+import logging
+
+import simplejson as json
+
 from twisted.internet import defer
 
 from synapse.api.errors import (
@@ -24,9 +29,6 @@ from ._base import BaseHandler
 from synapse.util.async import run_on_reactor
 from synapse.api.errors import SynapseError, Codes
 
-import json
-import logging
-
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5a8ddc253e..6de6e13b7b 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -27,7 +27,7 @@ from synapse.types import (
 from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
 from synapse.util.logcontext import preserve_fn, run_in_background
 from synapse.util.metrics import measure_func
-from synapse.util.frozenutils import unfreeze
+from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
 from synapse.replication.http.send_event import send_event_to_master
@@ -678,7 +678,7 @@ class EventCreationHandler(object):
 
         # Ensure that we can round trip before trying to persist in db
         try:
-            dump = simplejson.dumps(unfreeze(event.content))
+            dump = frozendict_json_encoder.encode(event.content)
             simplejson.loads(dump)
         except Exception:
             logger.exception("Failed to encode content: %r", event.content)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index ed5939880a..dd03705279 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -24,7 +24,7 @@ from synapse.api.errors import (
 from synapse.http.client import CaptchaServerHttpClient
 from synapse import types
 from synapse.types import UserID
-from synapse.util.async import run_on_reactor
+from synapse.util.async import run_on_reactor, Linearizer
 from synapse.util.threepids import check_3pid_allowed
 from ._base import BaseHandler
 
@@ -46,6 +46,10 @@ class RegistrationHandler(BaseHandler):
 
         self.macaroon_gen = hs.get_macaroon_generator()
 
+        self._generate_user_id_linearizer = Linearizer(
+            name="_generate_user_id_linearizer",
+        )
+
     @defer.inlineCallbacks
     def check_username(self, localpart, guest_access_token=None,
                        assigned_user_id=None):
@@ -345,9 +349,11 @@ class RegistrationHandler(BaseHandler):
     @defer.inlineCallbacks
     def _generate_user_id(self, reseed=False):
         if reseed or self._next_generated_user_id is None:
-            self._next_generated_user_id = (
-                yield self.store.find_next_generated_user_id_localpart()
-            )
+            with (yield self._generate_user_id_linearizer.queue(())):
+                if reseed or self._next_generated_user_id is None:
+                    self._next_generated_user_id = (
+                        yield self.store.find_next_generated_user_id_localpart()
+                    )
 
         id = self._next_generated_user_id
         self._next_generated_user_id += 1
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 91179ce532..40eedb63cb 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -34,7 +34,6 @@ REQUIREMENTS = {
     "bcrypt": ["bcrypt>=3.1.0"],
     "pillow": ["PIL"],
     "pydenticon": ["pydenticon"],
-    "ujson": ["ujson"],
     "blist": ["blist"],
     "pysaml2>=3.0.0": ["saml2>=3.0.0"],
     "pymacaroons-pynacl": ["pymacaroons"],
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 0005ad5879..12aac3cc6b 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -24,6 +24,8 @@ import simplejson
 
 logger = logging.getLogger(__name__)
 
+_json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False)
+
 
 class Command(object):
     """The base command class.
@@ -107,7 +109,7 @@ class RdataCommand(Command):
         return " ".join((
             self.stream_name,
             str(self.token) if self.token is not None else "batch",
-            simplejson.dumps(self.row, namedtuple_as_object=False),
+            _json_encoder.encode(self.row),
         ))
 
 
@@ -302,7 +304,7 @@ class InvalidateCacheCommand(Command):
 
     def to_line(self):
         return " ".join((
-            self.cache_func, simplejson.dumps(self.keys, namedtuple_as_object=False)
+            self.cache_func, _json_encoder.encode(self.keys),
         ))
 
 
@@ -334,7 +336,7 @@ class UserIpCommand(Command):
         )
 
     def to_line(self):
-        return self.user_id + " " + simplejson.dumps((
+        return self.user_id + " " + _json_encoder.encode((
             self.access_token, self.ip, self.user_agent, self.device_id,
             self.last_seen,
         ))
diff --git a/synapse/state.py b/synapse/state.py
index a7f20350f1..26093c8434 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -483,33 +483,34 @@ class StateResolutionHandler(object):
                     key: e_ids.pop() for key, e_ids in state.iteritems()
                 }
 
-            # if the new state matches any of the input state groups, we can
-            # use that state group again. Otherwise we will generate a state_id
-            # which will be used as a cache key for future resolutions, but
-            # not get persisted.
-            state_group = None
-            new_state_event_ids = frozenset(new_state.itervalues())
-            for sg, events in state_groups_ids.iteritems():
-                if new_state_event_ids == frozenset(e_id for e_id in events):
-                    state_group = sg
-                    break
-
-            # TODO: We want to create a state group for this set of events, to
-            # increase cache hits, but we need to make sure that it doesn't
-            # end up as a prev_group without being added to the database
-
-            prev_group = None
-            delta_ids = None
-            for old_group, old_ids in state_groups_ids.iteritems():
-                if not set(new_state) - set(old_ids):
-                    n_delta_ids = {
-                        k: v
-                        for k, v in new_state.iteritems()
-                        if old_ids.get(k) != v
-                    }
-                    if not delta_ids or len(n_delta_ids) < len(delta_ids):
-                        prev_group = old_group
-                        delta_ids = n_delta_ids
+            with Measure(self.clock, "state.create_group_ids"):
+                # if the new state matches any of the input state groups, we can
+                # use that state group again. Otherwise we will generate a state_id
+                # which will be used as a cache key for future resolutions, but
+                # not get persisted.
+                state_group = None
+                new_state_event_ids = frozenset(new_state.itervalues())
+                for sg, events in state_groups_ids.iteritems():
+                    if new_state_event_ids == frozenset(e_id for e_id in events):
+                        state_group = sg
+                        break
+
+                # TODO: We want to create a state group for this set of events, to
+                # increase cache hits, but we need to make sure that it doesn't
+                # end up as a prev_group without being added to the database
+
+                prev_group = None
+                delta_ids = None
+                for old_group, old_ids in state_groups_ids.iteritems():
+                    if not set(new_state) - set(old_ids):
+                        n_delta_ids = {
+                            k: v
+                            for k, v in new_state.iteritems()
+                            if old_ids.get(k) != v
+                        }
+                        if not delta_ids or len(n_delta_ids) < len(delta_ids):
+                            prev_group = old_group
+                            delta_ids = n_delta_ids
 
             cache = _StateCacheEntry(
                 state=new_state,
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 85ce6bea1a..ece5e6c41f 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -14,15 +14,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.events_worker import EventsWorkerStore
+from collections import OrderedDict, deque, namedtuple
+from functools import wraps
+import logging
 
+import simplejson as json
 from twisted.internet import defer
 
-from synapse.events import USE_FROZEN_DICTS
 
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.util.async import ObservableDeferred
+from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import (
-    PreserveLoggingContext, make_deferred_yieldable
+    PreserveLoggingContext, make_deferred_yieldable,
 )
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
@@ -30,16 +34,8 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.types import get_domain_from_id
-
-from canonicaljson import encode_canonical_json
-from collections import deque, namedtuple, OrderedDict
-from functools import wraps
-
 import synapse.metrics
 
-import logging
-import simplejson as json
-
 # these are only included to make the type annotations work
 from synapse.events import EventBase    # noqa: F401
 from synapse.events.snapshot import EventContext   # noqa: F401
@@ -53,12 +49,25 @@ event_counter = metrics.register_counter(
     "persisted_events_sep", labels=["type", "origin_type", "origin_entity"]
 )
 
+# The number of times we are recalculating the current state
+state_delta_counter = metrics.register_counter(
+    "state_delta",
+)
+# The number of times we are recalculating state when there is only a
+# single forward extremity
+state_delta_single_event_counter = metrics.register_counter(
+    "state_delta_single_event",
+)
+# The number of times we are reculating state when we could have resonably
+# calculated the delta when we calculated the state for an event we were
+# persisting.
+state_delta_reuse_delta_counter = metrics.register_counter(
+    "state_delta_reuse_delta",
+)
+
 
 def encode_json(json_object):
-    if USE_FROZEN_DICTS:
-        return encode_canonical_json(json_object)
-    else:
-        return json.dumps(json_object, ensure_ascii=False)
+    return frozendict_json_encoder.encode(json_object)
 
 
 class _EventPeristenceQueue(object):
@@ -368,7 +377,8 @@ class EventsStore(EventsWorkerStore):
                                 room_id, ev_ctx_rm, latest_event_ids
                             )
 
-                            if new_latest_event_ids == set(latest_event_ids):
+                            latest_event_ids = set(latest_event_ids)
+                            if new_latest_event_ids == latest_event_ids:
                                 # No change in extremities, so no change in state
                                 continue
 
@@ -389,6 +399,26 @@ class EventsStore(EventsWorkerStore):
                                 if all_single_prev_not_state:
                                     continue
 
+                            state_delta_counter.inc()
+                            if len(new_latest_event_ids) == 1:
+                                state_delta_single_event_counter.inc()
+
+                                # This is a fairly handwavey check to see if we could
+                                # have guessed what the delta would have been when
+                                # processing one of these events.
+                                # What we're interested in is if the latest extremities
+                                # were the same when we created the event as they are
+                                # now. When this server creates a new event (as opposed
+                                # to receiving it over federation) it will use the
+                                # forward extremities as the prev_events, so we can
+                                # guess this by looking at the prev_events and checking
+                                # if they match the current forward extremities.
+                                for ev, _ in ev_ctx_rm:
+                                    prev_event_ids = set(e for e, _ in ev.prev_events)
+                                    if latest_event_ids == prev_event_ids:
+                                        state_delta_reuse_delta_counter.inc()
+                                        break
+
                             logger.info(
                                 "Calculating state delta for room %s", room_id,
                             )
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index 8fde1aab8e..d03858234b 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError
 
 from ._base import SQLBaseStore
 
-import ujson as json
+import simplejson as json
 
 
 # The category ID for the "default" category. We don't store as null in the
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index d809b2ba46..6b557ca0cf 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -460,14 +460,12 @@ class RegistrationStore(RegistrationWorkerStore,
         """
         def _find_next_generated_user_id(txn):
             txn.execute("SELECT name FROM users")
-            rows = self.cursor_to_dict(txn)
 
             regex = re.compile("^@(\d+):")
 
             found = set()
 
-            for r in rows:
-                user_id = r["name"]
+            for user_id, in txn:
                 match = regex.search(user_id)
                 if match:
                     found.add(int(match.group(1)))
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 908551d6d9..740c036975 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -594,7 +594,8 @@ class RoomStore(RoomWorkerStore, SearchStore):
 
         while next_token:
             sql = """
-                SELECT stream_ordering, content FROM events
+                SELECT stream_ordering, json FROM events
+                JOIN event_json USING (event_id)
                 WHERE room_id = ?
                     AND stream_ordering < ?
                     AND contains_url = ? AND outlier = ?
@@ -606,8 +607,8 @@ class RoomStore(RoomWorkerStore, SearchStore):
             next_token = None
             for stream_ordering, content_json in txn:
                 next_token = stream_ordering
-                content = json.loads(content_json)
-
+                event_json = json.loads(content_json)
+                content = event_json["content"]
                 content_url = content.get("url")
                 thumbnail_url = content.get("info", {}).get("thumbnail_url")
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index d662d1cfc0..6a861943a2 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -645,8 +645,9 @@ class RoomMemberStore(RoomMemberWorkerStore):
 
         def add_membership_profile_txn(txn):
             sql = ("""
-                SELECT stream_ordering, event_id, events.room_id, content
+                SELECT stream_ordering, event_id, events.room_id, event_json.json
                 FROM events
+                INNER JOIN event_json USING (event_id)
                 INNER JOIN room_memberships USING (event_id)
                 WHERE ? <= stream_ordering AND stream_ordering < ?
                 AND type = 'm.room.member'
@@ -667,7 +668,8 @@ class RoomMemberStore(RoomMemberWorkerStore):
                 event_id = row["event_id"]
                 room_id = row["room_id"]
                 try:
-                    content = json.loads(row["content"])
+                    event_json = json.loads(row["json"])
+                    content = event_json['content']
                 except Exception:
                     continue
 
diff --git a/synapse/storage/schema/delta/14/upgrade_appservice_db.py b/synapse/storage/schema/delta/14/upgrade_appservice_db.py
index 8755bb2e49..4d725b92fe 100644
--- a/synapse/storage/schema/delta/14/upgrade_appservice_db.py
+++ b/synapse/storage/schema/delta/14/upgrade_appservice_db.py
@@ -12,9 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import json
 import logging
 
+import simplejson as json
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 984643b057..426cbe6e1a 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -75,8 +75,9 @@ class SearchStore(BackgroundUpdateStore):
 
         def reindex_search_txn(txn):
             sql = (
-                "SELECT stream_ordering, event_id, room_id, type, content, "
+                "SELECT stream_ordering, event_id, room_id, type, json, "
                 " origin_server_ts FROM events"
+                " JOIN event_json USING (event_id)"
                 " WHERE ? <= stream_ordering AND stream_ordering < ?"
                 " AND (%s)"
                 " ORDER BY stream_ordering DESC"
@@ -104,7 +105,8 @@ class SearchStore(BackgroundUpdateStore):
                     stream_ordering = row["stream_ordering"]
                     origin_server_ts = row["origin_server_ts"]
                     try:
-                        content = json.loads(row["content"])
+                        event_json = json.loads(row["json"])
+                        content = event_json["content"]
                     except Exception:
                         continue
 
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index dfdcbb3181..d6e289ffbe 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -667,7 +667,7 @@ class UserDirectoryStore(SQLBaseStore):
             # The array of numbers are the weights for the various part of the
             # search: (domain, _, display name, localpart)
             sql = """
-                SELECT d.user_id, display_name, avatar_url
+                SELECT d.user_id AS user_id, display_name, avatar_url
                 FROM user_directory_search
                 INNER JOIN user_directory AS d USING (user_id)
                 %s
@@ -702,7 +702,7 @@ class UserDirectoryStore(SQLBaseStore):
             search_query = _parse_query_sqlite(search_term)
 
             sql = """
-                SELECT d.user_id, display_name, avatar_url
+                SELECT d.user_id AS user_id, display_name, avatar_url
                 FROM user_directory_search
                 INNER JOIN user_directory AS d USING (user_id)
                 %s
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index 6322f0f55c..f497b51f4a 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from frozendict import frozendict
+import simplejson as json
 
 
 def freeze(o):
@@ -49,3 +50,21 @@ def unfreeze(o):
         pass
 
     return o
+
+
+def _handle_frozendict(obj):
+    """Helper for EventEncoder. Makes frozendicts serializable by returning
+    the underlying dict
+    """
+    if type(obj) is frozendict:
+        # fishing the protected dict out of the object is a bit nasty,
+        # but we don't really want the overhead of copying the dict.
+        return obj._dict
+    raise TypeError('Object of type %s is not JSON serializable' %
+                    obj.__class__.__name__)
+
+
+# A JSONEncoder which is capable of encoding frozendics without barfing
+frozendict_json_encoder = json.JSONEncoder(
+    default=_handle_frozendict,
+)