summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/event_creator.py170
-rwxr-xr-xsynapse/app/homeserver.py4
-rw-r--r--synapse/config/workers.py8
-rw-r--r--synapse/events/snapshot.py72
-rw-r--r--synapse/handlers/message.py28
-rw-r--r--synapse/replication/http/__init__.py31
-rw-r--r--synapse/replication/http/send_event.py108
-rw-r--r--synapse/replication/slave/storage/events.py20
-rw-r--r--synapse/rest/client/v1/room.py1
-rw-r--r--synapse/storage/appservice.py13
-rw-r--r--synapse/storage/background_updates.py19
-rw-r--r--synapse/storage/registration.py7
-rw-r--r--synapse/storage/schema/delta/38/postgres_fts_gist.sql6
-rw-r--r--synapse/storage/schema/delta/47/postgres_fts_gin.sql17
-rw-r--r--synapse/storage/search.py81
-rw-r--r--synapse/types.py65
16 files changed, 610 insertions, 40 deletions
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
new file mode 100644
index 0000000000..b2ce399258
--- /dev/null
+++ b/synapse/app/event_creator.py
@@ -0,0 +1,170 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import sys
+
+import synapse
+from synapse import events
+from synapse.app import _base
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
+from synapse.http.server import JsonResource
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1.room import RoomSendEventRestServlet
+from synapse.server import HomeServer
+from synapse.storage.engines import create_engine
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.versionstring import get_version_string
+from twisted.internet import reactor
+from twisted.web.resource import Resource
+
+logger = logging.getLogger("synapse.app.event_creator")
+
+
+class EventCreatorSlavedStore(
+    SlavedDeviceStore,
+    SlavedClientIpStore,
+    SlavedApplicationServiceStore,
+    SlavedEventStore,
+    SlavedRegistrationStore,
+    RoomStore,
+    BaseSlavedStore,
+):
+    pass
+
+
+class EventCreatorServer(HomeServer):
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_addresses = listener_config["bind_addresses"]
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+                elif name == "client":
+                    resource = JsonResource(self, canonical_json=False)
+                    RoomSendEventRestServlet(self).register(resource)
+                    resources.update({
+                        "/_matrix/client/r0": resource,
+                        "/_matrix/client/unstable": resource,
+                        "/_matrix/client/v2_alpha": resource,
+                        "/_matrix/client/api/v1": resource,
+                    })
+
+        root_resource = create_resource_tree(resources, Resource())
+
+        _base.listen_tcp(
+            bind_addresses,
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            )
+        )
+
+        logger.info("Synapse event creator now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                _base.listen_tcp(
+                    listener["bind_addresses"],
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    )
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+        self.get_tcp_replication().start_replication(self)
+
+    def build_tcp_replication(self):
+        return ReplicationClientHandler(self.get_datastore())
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse event creator", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.event_creator"
+
+    assert config.worker_replication_http_port is not None
+
+    setup_logging(config, use_worker_options=True)
+
+    events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    database_engine = create_engine(config.database_config)
+
+    tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+    ss = EventCreatorServer(
+        config.server_name,
+        db_config=config.database_config,
+        tls_server_context_factory=tls_server_context_factory,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ss.setup()
+    ss.get_handlers()
+    ss.start_listening(config.worker_listeners)
+
+    def start():
+        ss.get_state_handler().start_caching()
+        ss.get_datastore().start_profiling()
+
+    reactor.callWhenRunning(start)
+
+    _base.start_worker_reactor("synapse-event-creator", config)
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index cb82a415a6..e375f2bbcf 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -38,6 +38,7 @@ from synapse.metrics import register_memory_metrics
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
     check_requirements
+from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
 from synapse.rest import ClientRestResource
 from synapse.rest.key.v1.server_key_resource import LocalKey
@@ -219,6 +220,9 @@ class SynapseHomeServer(HomeServer):
         if name == "metrics" and self.get_config().enable_metrics:
             resources[METRICS_PREFIX] = MetricsResource(self)
 
+        if name == "replication":
+            resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
+
         return resources
 
     def start_listening(self):
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 4b6884918d..80baf0ce0e 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -33,8 +33,16 @@ class WorkerConfig(Config):
         self.worker_pid_file = config.get("worker_pid_file")
         self.worker_log_file = config.get("worker_log_file")
         self.worker_log_config = config.get("worker_log_config")
+
+        # The host used to connect to the main synapse
         self.worker_replication_host = config.get("worker_replication_host", None)
+
+        # The port on the main synapse for TCP replication
         self.worker_replication_port = config.get("worker_replication_port", None)
+
+        # The port on the main synapse for HTTP replication endpoint
+        self.worker_replication_http_port = config.get("worker_replication_http_port")
+
         self.worker_name = config.get("worker_name", self.worker_app)
 
         self.worker_main_http_uri = config.get("worker_main_http_uri", None)
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 87e3fe7b97..7b80444f73 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -14,6 +14,9 @@
 # limitations under the License.
 
 
+from frozendict import frozendict
+
+
 class EventContext(object):
     """
     Attributes:
@@ -73,3 +76,72 @@ class EventContext(object):
         self.prev_state_events = None
 
         self.app_service = None
+
+    def serialize(self):
+        """Converts self to a type that can be serialized as JSON, and then
+        deserialized by `deserialize`
+
+        Returns:
+            dict
+        """
+        return {
+            "current_state_ids": _encode_state_dict(self.current_state_ids),
+            "prev_state_ids": _encode_state_dict(self.prev_state_ids),
+            "state_group": self.state_group,
+            "rejected": self.rejected,
+            "push_actions": self.push_actions,
+            "prev_group": self.prev_group,
+            "delta_ids": _encode_state_dict(self.delta_ids),
+            "prev_state_events": self.prev_state_events,
+            "app_service_id": self.app_service.id if self.app_service else None
+        }
+
+    @staticmethod
+    def deserialize(store, input):
+        """Converts a dict that was produced by `serialize` back into a
+        EventContext.
+
+        Args:
+            store (DataStore): Used to convert AS ID to AS object
+            input (dict): A dict produced by `serialize`
+
+        Returns:
+            EventContext
+        """
+        context = EventContext()
+        context.current_state_ids = _decode_state_dict(input["current_state_ids"])
+        context.prev_state_ids = _decode_state_dict(input["prev_state_ids"])
+        context.state_group = input["state_group"]
+        context.rejected = input["rejected"]
+        context.push_actions = input["push_actions"]
+        context.prev_group = input["prev_group"]
+        context.delta_ids = _decode_state_dict(input["delta_ids"])
+        context.prev_state_events = input["prev_state_events"]
+
+        app_service_id = input["app_service_id"]
+        if app_service_id:
+            context.app_service = store.get_app_service_by_id(app_service_id)
+
+        return context
+
+
+def _encode_state_dict(state_dict):
+    """Since dicts of (type, state_key) -> event_id cannot be serialized in
+    JSON we need to convert them to a form that can.
+    """
+    if state_dict is None:
+        return None
+
+    return [
+        (etype, state_key, v)
+        for (etype, state_key), v in state_dict.iteritems()
+    ]
+
+
+def _decode_state_dict(input):
+    """Decodes a state dict encoded using `_encode_state_dict` above
+    """
+    if input is None:
+        return None
+
+    return frozendict({(etype, state_key,): v for etype, state_key, v in input})
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4e9752ccbd..1c3ac03f20 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -28,6 +28,7 @@ from synapse.util.logcontext import preserve_fn
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import unfreeze
 from synapse.visibility import filter_events_for_client
+from synapse.replication.http.send_event import send_event_to_master
 
 from ._base import BaseHandler
 
@@ -312,6 +313,9 @@ class EventCreationHandler(object):
         self.server_name = hs.hostname
         self.ratelimiter = hs.get_ratelimiter()
         self.notifier = hs.get_notifier()
+        self.config = hs.config
+
+        self.http_client = hs.get_simple_http_client()
 
         # This is only used to get at ratelimit function, and maybe_kick_guest_users
         self.base_handler = BaseHandler(hs)
@@ -419,12 +423,6 @@ class EventCreationHandler(object):
             ratelimit=ratelimit,
         )
 
-        if event.type == EventTypes.Message:
-            presence = self.hs.get_presence_handler()
-            # We don't want to block sending messages on any presence code. This
-            # matters as sometimes presence code can take a while.
-            preserve_fn(presence.bump_presence_active_time)(user)
-
     @defer.inlineCallbacks
     def deduplicate_state_event(self, event, context):
         """
@@ -559,6 +557,18 @@ class EventCreationHandler(object):
     ):
         # We now need to go and hit out to wherever we need to hit out to.
 
+        # If we're a worker we need to hit out to the master.
+        if self.config.worker_app:
+            yield send_event_to_master(
+                self.http_client,
+                host=self.config.worker_replication_host,
+                port=self.config.worker_replication_http_port,
+                requester=requester,
+                event=event,
+                context=context,
+            )
+            return
+
         if ratelimit:
             yield self.base_handler.ratelimit(requester)
 
@@ -692,3 +702,9 @@ class EventCreationHandler(object):
             )
 
         preserve_fn(_notify)()
+
+        if event.type == EventTypes.Message:
+            presence = self.hs.get_presence_handler()
+            # We don't want to block sending messages on any presence code. This
+            # matters as sometimes presence code can take a while.
+            preserve_fn(presence.bump_presence_active_time)(requester.user)
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
new file mode 100644
index 0000000000..b378b41646
--- /dev/null
+++ b/synapse/replication/http/__init__.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import send_event
+
+from synapse.http.server import JsonResource
+
+
+REPLICATION_PREFIX = "/_synapse/replication"
+
+
+class ReplicationRestResource(JsonResource):
+    def __init__(self, hs):
+        JsonResource.__init__(self, hs, canonical_json=False)
+        self.register_servlets(hs)
+
+    def register_servlets(self, hs):
+        send_event.register_servlets(hs, self)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
new file mode 100644
index 0000000000..ff9b9d2f10
--- /dev/null
+++ b/synapse/replication/http/send_event.py
@@ -0,0 +1,108 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.util.metrics import Measure
+from synapse.types import Requester
+
+import logging
+import re
+
+logger = logging.getLogger(__name__)
+
+
+def send_event_to_master(client, host, port, requester, event, context):
+    """Send event to be handled on the master
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication
+        requester (Requester)
+        event (FrozenEvent)
+        context (EventContext)
+    """
+    uri = "http://%s:%s/_synapse/replication/send_event" % (host, port,)
+
+    payload = {
+        "event": event.get_pdu_json(),
+        "internal_metadata": event.internal_metadata.get_dict(),
+        "rejected_reason": event.rejected_reason,
+        "context": context.serialize(),
+        "requester": requester.serialize(),
+    }
+
+    return client.post_json_get_json(uri, payload)
+
+
+class ReplicationSendEventRestServlet(RestServlet):
+    """Handles events newly created on workers, including persisting and
+    notifying.
+
+    The API looks like:
+
+        POST /_synapse/replication/send_event
+
+        {
+            "event": { .. serialized event .. },
+            "internal_metadata": { .. serialized internal_metadata .. },
+            "rejected_reason": ..,   // The event.rejected_reason field
+            "context": { .. serialized event context .. },
+            "requester": { .. serialized requester .. },
+        }
+    """
+    PATTERNS = [re.compile("^/_synapse/replication/send_event$")]
+
+    def __init__(self, hs):
+        super(ReplicationSendEventRestServlet, self).__init__()
+
+        self.event_creation_handler = hs.get_event_creation_handler()
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        with Measure(self.clock, "repl_send_event_parse"):
+            content = parse_json_object_from_request(request)
+
+            event_dict = content["event"]
+            internal_metadata = content["internal_metadata"]
+            rejected_reason = content["rejected_reason"]
+            event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
+
+            requester = Requester.deserialize(self.store, content["requester"])
+            context = EventContext.deserialize(self.store, content["context"])
+
+        if requester.user:
+            request.authenticated_entity = requester.user.to_string()
+
+        logger.info(
+            "Got event to send with ID: %s into room: %s",
+            event.event_id, event.room_id,
+        )
+
+        yield self.event_creation_handler.handle_new_client_event(
+            requester, event, context,
+        )
+
+        defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+    ReplicationSendEventRestServlet(hs).register(http_server)
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 8acb5df0f3..f8c164b48b 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -21,6 +21,7 @@ from synapse.storage.event_push_actions import EventPushActionsStore
 from synapse.storage.roommember import RoomMemberStore
 from synapse.storage.state import StateGroupWorkerStore
 from synapse.storage.stream import StreamStore
+from synapse.storage.signatures import SignatureStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
@@ -170,6 +171,25 @@ class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore):
     get_federation_out_pos = DataStore.get_federation_out_pos.__func__
     update_federation_out_pos = DataStore.update_federation_out_pos.__func__
 
+    get_latest_event_ids_and_hashes_in_room = (
+        DataStore.get_latest_event_ids_and_hashes_in_room.__func__
+    )
+    _get_latest_event_ids_and_hashes_in_room = (
+        DataStore._get_latest_event_ids_and_hashes_in_room.__func__
+    )
+    _get_event_reference_hashes_txn = (
+        DataStore._get_event_reference_hashes_txn.__func__
+    )
+    add_event_hashes = (
+        DataStore.add_event_hashes.__func__
+    )
+    get_event_reference_hashes = (
+        SignatureStore.__dict__["get_event_reference_hashes"]
+    )
+    get_event_reference_hash = (
+        SignatureStore.__dict__["get_event_reference_hash"]
+    )
+
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()
         result["events"] = self._stream_id_gen.get_current_token()
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index fbb2fc36e4..817fd47842 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -186,7 +186,6 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(RoomSendEventRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
         self.event_creation_hander = hs.get_event_creation_handler()
 
     def register(self, http_server):
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index d8c84b7141..79673b4273 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -99,6 +99,19 @@ class ApplicationServiceStore(SQLBaseStore):
                 return service
         return None
 
+    def get_app_service_by_id(self, as_id):
+        """Get the application service with the given appservice ID.
+
+        Args:
+            as_id (str): The application service ID.
+        Returns:
+            synapse.appservice.ApplicationService or None.
+        """
+        for service in self.services_cache:
+            if service.id == as_id:
+                return service
+        return None
+
     def get_app_service_rooms(self, service):
         """Get a list of RoomsForUser for this application service.
 
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 11a1b942f1..c88759bf2c 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -242,6 +242,25 @@ class BackgroundUpdateStore(SQLBaseStore):
         """
         self._background_update_handlers[update_name] = update_handler
 
+    def register_noop_background_update(self, update_name):
+        """Register a noop handler for a background update.
+
+        This is useful when we previously did a background update, but no
+        longer wish to do the update. In this case the background update should
+        be removed from the schema delta files, but there may still be some
+        users who have the background update queued, so this method should
+        also be called to clear the update.
+
+        Args:
+            update_name (str): Name of update
+        """
+        @defer.inlineCallbacks
+        def noop_update(progress, batch_size):
+            yield self._end_background_update(update_name)
+            defer.returnValue(1)
+
+        self.register_background_update_handler(update_name, noop_update)
+
     def register_background_index_update(self, update_name, index_name,
                                          table, columns, where_clause=None,
                                          unique=False,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 3aa810981f..95f75d6df1 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -39,12 +39,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
         # we no longer use refresh tokens, but it's possible that some people
         # might have a background update queued to build this index. Just
         # clear the background update.
-        @defer.inlineCallbacks
-        def noop_update(progress, batch_size):
-            yield self._end_background_update("refresh_tokens_device_index")
-            defer.returnValue(1)
-        self.register_background_update_handler(
-            "refresh_tokens_device_index", noop_update)
+        self.register_noop_background_update("refresh_tokens_device_index")
 
     @defer.inlineCallbacks
     def add_access_token_to_user(self, user_id, token, device_id=None):
diff --git a/synapse/storage/schema/delta/38/postgres_fts_gist.sql b/synapse/storage/schema/delta/38/postgres_fts_gist.sql
index f090a7b75a..515e6b8e84 100644
--- a/synapse/storage/schema/delta/38/postgres_fts_gist.sql
+++ b/synapse/storage/schema/delta/38/postgres_fts_gist.sql
@@ -13,5 +13,7 @@
  * limitations under the License.
  */
 
- INSERT into background_updates (update_name, progress_json)
-     VALUES ('event_search_postgres_gist', '{}');
+-- We no longer do this given we back it out again in schema 47
+
+-- INSERT into background_updates (update_name, progress_json)
+--     VALUES ('event_search_postgres_gist', '{}');
diff --git a/synapse/storage/schema/delta/47/postgres_fts_gin.sql b/synapse/storage/schema/delta/47/postgres_fts_gin.sql
new file mode 100644
index 0000000000..31d7a817eb
--- /dev/null
+++ b/synapse/storage/schema/delta/47/postgres_fts_gin.sql
@@ -0,0 +1,17 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('event_search_postgres_gin', '{}');
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index f1ac9ba0fd..2755acff40 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -38,6 +38,7 @@ class SearchStore(BackgroundUpdateStore):
     EVENT_SEARCH_UPDATE_NAME = "event_search"
     EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
     EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist"
+    EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin"
 
     def __init__(self, db_conn, hs):
         super(SearchStore, self).__init__(db_conn, hs)
@@ -48,9 +49,19 @@ class SearchStore(BackgroundUpdateStore):
             self.EVENT_SEARCH_ORDER_UPDATE_NAME,
             self._background_reindex_search_order
         )
-        self.register_background_update_handler(
+
+        # we used to have a background update to turn the GIN index into a
+        # GIST one; we no longer do that (obviously) because we actually want
+        # a GIN index. However, it's possible that some people might still have
+        # the background update queued, so we register a handler to clear the
+        # background update.
+        self.register_noop_background_update(
             self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME,
-            self._background_reindex_gist_search
+        )
+
+        self.register_background_update_handler(
+            self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME,
+            self._background_reindex_gin_search
         )
 
     @defer.inlineCallbacks
@@ -151,25 +162,48 @@ class SearchStore(BackgroundUpdateStore):
         defer.returnValue(result)
 
     @defer.inlineCallbacks
-    def _background_reindex_gist_search(self, progress, batch_size):
+    def _background_reindex_gin_search(self, progress, batch_size):
+        """This handles old synapses which used GIST indexes, if any;
+        converting them back to be GIN as per the actual schema.
+        """
+
         def create_index(conn):
             conn.rollback()
-            conn.set_session(autocommit=True)
-            c = conn.cursor()
 
-            c.execute(
-                "CREATE INDEX CONCURRENTLY event_search_fts_idx_gist"
-                " ON event_search USING GIST (vector)"
-            )
+            # we have to set autocommit, because postgres refuses to
+            # CREATE INDEX CONCURRENTLY without it.
+            conn.set_session(autocommit=True)
 
-            c.execute("DROP INDEX event_search_fts_idx")
+            try:
+                c = conn.cursor()
 
-            conn.set_session(autocommit=False)
+                # if we skipped the conversion to GIST, we may already/still
+                # have an event_search_fts_idx; unfortunately postgres 9.4
+                # doesn't support CREATE INDEX IF EXISTS so we just catch the
+                # exception and ignore it.
+                import psycopg2
+                try:
+                    c.execute(
+                        "CREATE INDEX CONCURRENTLY event_search_fts_idx"
+                        " ON event_search USING GIN (vector)"
+                    )
+                except psycopg2.ProgrammingError as e:
+                    logger.warn(
+                        "Ignoring error %r when trying to switch from GIST to GIN",
+                        e
+                    )
+
+                # we should now be able to delete the GIST index.
+                c.execute(
+                    "DROP INDEX IF EXISTS event_search_fts_idx_gist"
+                )
+            finally:
+                conn.set_session(autocommit=False)
 
         if isinstance(self.database_engine, PostgresEngine):
             yield self.runWithConnection(create_index)
 
-        yield self._end_background_update(self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME)
+        yield self._end_background_update(self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME)
         defer.returnValue(1)
 
     @defer.inlineCallbacks
@@ -289,7 +323,30 @@ class SearchStore(BackgroundUpdateStore):
                 entry.stream_ordering, entry.origin_server_ts,
             ) for entry in entries)
 
+            # inserts to a GIN index are normally batched up into a pending
+            # list, and then all committed together once the list gets to a
+            # certain size. The trouble with that is that postgres (pre-9.5)
+            # uses work_mem to determine the length of the list, and work_mem
+            # is typically very large.
+            #
+            # We therefore reduce work_mem while we do the insert.
+            #
+            # (postgres 9.5 uses the separate gin_pending_list_limit setting,
+            # so doesn't suffer the same problem, but changing work_mem will
+            # be harmless)
+            #
+            # Note that we don't need to worry about restoring it on
+            # exception, because exceptions will cause the transaction to be
+            # rolled back, including the effects of the SET command.
+            #
+            # Also: we use SET rather than SET LOCAL because there's lots of
+            # other stuff going on in this transaction, which want to have the
+            # normal work_mem setting.
+
+            txn.execute("SET work_mem='256kB'")
             txn.executemany(sql, args)
+            txn.execute("RESET work_mem")
+
         elif isinstance(self.database_engine, Sqlite3Engine):
             sql = (
                 "INSERT INTO event_search (event_id, room_id, key, value)"
diff --git a/synapse/types.py b/synapse/types.py
index 6e76c016d9..7cb24cecb2 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -19,20 +19,59 @@ from synapse.api.errors import SynapseError
 from collections import namedtuple
 
 
-Requester = namedtuple("Requester", [
+class Requester(namedtuple("Requester", [
     "user", "access_token_id", "is_guest", "device_id", "app_service",
-])
-"""
-Represents the user making a request
-
-Attributes:
-    user (UserID):  id of the user making the request
-    access_token_id (int|None):  *ID* of the access token used for this
-        request, or None if it came via the appservice API or similar
-    is_guest (bool):  True if the user making this request is a guest user
-    device_id (str|None):  device_id which was set at authentication time
-    app_service (ApplicationService|None):  the AS requesting on behalf of the user
-"""
+])):
+    """
+    Represents the user making a request
+
+    Attributes:
+        user (UserID):  id of the user making the request
+        access_token_id (int|None):  *ID* of the access token used for this
+            request, or None if it came via the appservice API or similar
+        is_guest (bool):  True if the user making this request is a guest user
+        device_id (str|None):  device_id which was set at authentication time
+        app_service (ApplicationService|None):  the AS requesting on behalf of the user
+    """
+
+    def serialize(self):
+        """Converts self to a type that can be serialized as JSON, and then
+        deserialized by `deserialize`
+
+        Returns:
+            dict
+        """
+        return {
+            "user_id": self.user.to_string(),
+            "access_token_id": self.access_token_id,
+            "is_guest": self.is_guest,
+            "device_id": self.device_id,
+            "app_server_id": self.app_service.id if self.app_service else None,
+        }
+
+    @staticmethod
+    def deserialize(store, input):
+        """Converts a dict that was produced by `serialize` back into a
+        Requester.
+
+        Args:
+            store (DataStore): Used to convert AS ID to AS object
+            input (dict): A dict produced by `serialize`
+
+        Returns:
+            Requester
+        """
+        appservice = None
+        if input["app_server_id"]:
+            appservice = store.get_app_service_by_id(input["app_server_id"])
+
+        return Requester(
+            user=UserID.from_string(input["user_id"]),
+            access_token_id=input["access_token_id"],
+            is_guest=input["is_guest"],
+            device_id=input["device_id"],
+            app_service=appservice,
+        )
 
 
 def create_requester(user_id, access_token_id=None, is_guest=False,