summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-11-16 17:34:44 +0000
committerErik Johnston <erik@matrix.org>2016-11-16 17:34:44 +0000
commited787cf09edd77e39ad9da0b957359214de85287 (patch)
treebefff8fe0284099e1e2df9eccd729ff1eedd4727
parentAdd initial cut of federation send queue (diff)
downloadsynapse-ed787cf09edd77e39ad9da0b957359214de85287.tar.xz
Hook up the send queue and create a federation sender worker
-rw-r--r--synapse/app/federation_sender.py302
-rw-r--r--synapse/config/server.py5
-rw-r--r--synapse/federation/send_queue.py89
-rw-r--r--synapse/replication/resource.py24
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py5
-rw-r--r--synapse/replication/slave/storage/transactions.py3
-rw-r--r--synapse/server.py6
-rw-r--r--synapse/storage/presence.py7
8 files changed, 419 insertions, 22 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
new file mode 100644
index 0000000000..7a4fec4a66
--- /dev/null
+++ b/synapse/app/federation_sender.py
@@ -0,0 +1,302 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.server import HomeServer
+from synapse.config._base import ConfigError
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
+from synapse.crypto import context_factory
+from synapse.http.site import SynapseSite
+from synapse.federation import send_queue
+from synapse.federation.units import Edu
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.storage.engines import create_engine
+from synapse.storage.presence import UserPresenceState
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+
+from synapse import events
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+import ujson as json
+
+logger = logging.getLogger("synapse.app.appservice")
+
+
+class FederationSenderSlaveStore(
+    SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
+    SlavedRegistrationStore,
+):
+    pass
+
+
+class FederationSenderServer(HomeServer):
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_address = listener_config.get("bind_address", "")
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+
+        root_resource = create_resource_tree(resources, Resource())
+        reactor.listenTCP(
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            ),
+            interface=bind_address
+        )
+        logger.info("Synapse federation_sender now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                reactor.listenTCP(
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
+                    interface=listener.get("bind_address", '127.0.0.1')
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.worker_replication_url
+        send_handler = self._get_send_handler()
+
+        def replicate(results):
+            stream = results.get("events")
+            if stream:
+                # max_stream_id = stream["position"]
+                # TODO
+                pass
+
+        while True:
+            try:
+                args = store.stream_positions()
+                args.update(send_handler.stream_positions())
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                yield store.process_replication(result)
+                send_handler.process_replication(result)
+                replicate(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                yield sleep(30)
+
+    def _get_send_handler(self):
+        try:
+            return self._send_handler
+        except AttributeError:
+            self._send_handler = FederationSenderHandler(self)
+            return self._send_handler
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse federation sender", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.federation_sender"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    database_engine = create_engine(config.database_config)
+
+    if config.send_federation:
+        sys.stderr.write(
+            "\nThe send_federation must be disabled in the main synapse process"
+            "\nbefore they can be run in a separate worker."
+            "\nPlease add ``send_federation: false`` to the main config"
+            "\n"
+        )
+        sys.exit(1)
+
+    # Force the pushers to start since they will be disabled in the main config
+    config.send_federation = True
+
+    tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+    ps = FederationSenderServer(
+        config.server_name,
+        db_config=config.database_config,
+        tls_server_context_factory=tls_server_context_factory,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ps.setup()
+    ps.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ps.replicate()
+        ps.get_datastore().start_profiling()
+        ps.get_state_handler().start_caching()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-federation-sender",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+class FederationSenderHandler(object):
+    def __init__(self, hs):
+        self.federation_sender = hs.get_federation_sender()
+
+        self._latest_room_serial = -1
+        self._room_serials = {}
+        self._room_typing = {}
+
+    def stream_positions(self):
+        # We must update this token from the response of the previous
+        # sync. In particular, the stream id may "reset" back to zero/a low
+        # value which we *must* use for the next replication request.
+        return {"federation": self._latest_room_serial}
+
+    def process_replication(self, result):
+        stream = result.get("federation")
+        if stream:
+            self._latest_room_serial = int(stream["position"])
+
+            presence_to_send = {}
+            keyed_edus = {}
+            edus = {}
+            failures = {}
+
+            for row in stream["rows"]:
+                position, typ, content_js = row
+                content = json.loads(content_js)
+
+                if typ == send_queue.PRESENCE_TYPE:
+                    destination = content["destination"]
+                    state = UserPresenceState.from_dict(content["state"])
+
+                    presence_to_send.setdefault(destination, []).append(state)
+                elif typ == send_queue.KEYED_EDU_TYPE:
+                    key = content["key"]
+                    edu = Edu(**content["edu"])
+
+                    keyed_edus.setdefault(edu.destination, {})[key] = edu
+                elif typ == send_queue.EDU_TYPE:
+                    edu = Edu(**content)
+
+                    edus.setdefault(edu.destination, []).append(edu)
+                elif typ == send_queue.FAILURE_TYPE:
+                    destination = content["destination"]
+                    failure = content["failure"]
+
+                    failures.setdefault(destination, []).append(failure)
+                else:
+                    raise Exception("Unrecognised federation type: %r", typ)
+
+            for destination, states in presence_to_send.items():
+                self.federation_sender.send_presence(destination, states)
+
+            for destination, edu_map in keyed_edus.items():
+                for key, edu in edu_map.items():
+                    self.federation_sender.send_edu(
+                        edu.destination, edu.edu_type, edu.content, key=key,
+                    )
+
+            for destination, edu_list in edus.items():
+                for edu in edu_list:
+                    self.federation_sender.send_edu(
+                        edu.destination, edu.edu_type, edu.content, key=None,
+                    )
+
+            for destination, failure_list in failures.items():
+                for failure in failure_list:
+                    self.federation_sender.send_failure(destination, failure)
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/config/server.py b/synapse/config/server.py
index ed5417d0c3..634d8e6fe5 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -30,6 +30,11 @@ class ServerConfig(Config):
         self.use_frozen_dicts = config.get("use_frozen_dicts", False)
         self.public_baseurl = config.get("public_baseurl")
 
+        # Whether to send federation traffic out in this process. This only
+        # applies to some federation traffic, and so shouldn't be used to
+        # "disable" federation
+        self.send_federation = config.get("send_federation", True)
+
         if self.public_baseurl is not None:
             if self.public_baseurl[-1] != '/':
                 self.public_baseurl += '/'
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 3d3c3d98ff..d439be050a 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -13,11 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from .units import Edu
 
 from blist import sorteddict
+import ujson
+
+
+PRESENCE_TYPE = "p"
+KEYED_EDU_TYPE = "k"
+EDU_TYPE = "e"
+FAILURE_TYPE = "f"
 
 
 class FederationRemoteSendQueue(object):
+
     def __init__(self, hs):
         self.clock = hs.get_clock()
 
@@ -68,12 +77,12 @@ class FederationRemoteSendQueue(object):
         for key in keys[:i]:
             del self.presence_changed[key]
 
-        user_ids = set()
-        for _, states in self.presence_changed.values():
-            user_ids.update(s.user_id for s in user_ids)
+        user_ids = set(
+            user_id for uids in self.presence_changed.values() for _, user_id in uids
+        )
 
         to_del = [user_id for user_id in self.presence_map if user_id not in user_ids]
-        for user_id in self.to_del:
+        for user_id in to_del:
             del self.presence_map[user_id]
 
         # Delete things out of keyed edus
@@ -102,47 +111,77 @@ class FederationRemoteSendQueue(object):
         for key in keys[:i]:
             del self.failures[key]
 
-    def send_edu(self, edu, key=None):
+    def send_edu(self, destination, edu_type, content, key=None):
         pos = self._next_pos()
 
+        edu = Edu(
+            origin=self.server_name,
+            destination=destination,
+            edu_type=edu_type,
+            content=content,
+        )
+
         if key:
-            self.keyed_edu[(edu.destination, key)] = edu
-            self.keyed_edu_changed[pos] = (edu.destination, key)
+            self.keyed_edu[(destination, key)] = edu
+            self.keyed_edu_changed[pos] = (destination, key)
         else:
             self.edus[pos] = edu
 
     def send_presence(self, destination, states):
         pos = self._next_pos()
 
-        self.presence_map.presence_map.update({
+        self.presence_map.update({
             state.user_id: state
             for state in states
         })
 
-        self.presence_changed[pos] = (destination, [
-            state.user_id for state in states
-        ])
+        self.presence_changed[pos] = [
+            (destination, state.user_id) for state in states
+        ]
 
     def send_failure(self, failure, destination):
         pos = self._next_pos()
 
-        self.failures[pos] = (destination, failure)
+        self.failures[pos] = (destination, str(failure))
+
+    def send_pdu(self, pdu, destinations):
+        # This gets sent down a separate path
+        pass
 
     def notify_new_device_message(self, destination):
         # TODO
         pass
 
-    def get_replication_rows(self, token):
+    def get_current_token(self):
+        return self.pos - 1
+
+    def get_replication_rows(self, token, limit):
+        # TODO: Handle limit.
+
+        # To handle restarts where we wrap around
+        if token > self.pos:
+            token = -1
+
         rows = []
 
+        # There should be only one reader, so lets delete everything its
+        # acknowledged its seen.
+        self._clear_queue_before_pos(token)
+
         # Fetch changed presence
         keys = self.presence_changed.keys()
         i = keys.bisect_right(token)
-        dest_user_ids = set((k, self.presence_changed[k]) for k in keys[i:])
-
-        for (key, (dest, user_ids)) in dest_user_ids:
-            for user_id in user_ids:
-                rows.append((key, dest, "p", self.presence_map[user_id]))
+        dest_user_ids = set(
+            (pos, dest_user_id)
+            for pos in keys[i:]
+            for dest_user_id in self.presence_changed[pos]
+        )
+
+        for (key, (dest, user_id)) in dest_user_ids:
+            rows.append((key, PRESENCE_TYPE, ujson.dumps({
+                "destination": dest,
+                "state": self.presence_map[user_id].as_dict(),
+            })))
 
         # Fetch changes keyed edus
         keys = self.keyed_edu_changed.keys()
@@ -150,7 +189,12 @@ class FederationRemoteSendQueue(object):
         keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
 
         for (pos, edu_key) in keyed_edus:
-            rows.append((pos, edu_key, "k", self.keyed_edu[edu_key]))
+            rows.append(
+                (pos, KEYED_EDU_TYPE, ujson.dumps({
+                    "key": edu_key,
+                    "edu": self.keyed_edu[edu_key].get_dict(),
+                }))
+            )
 
         # Fetch changed edus
         keys = self.edus.keys()
@@ -158,7 +202,7 @@ class FederationRemoteSendQueue(object):
         edus = set((k, self.edus[k]) for k in keys[i:])
 
         for (pos, edu) in edus:
-            rows.append((pos, edu.destination, "e", edu))
+            rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_dict())))
 
         # Fetch changed failures
         keys = self.failures.keys()
@@ -166,7 +210,10 @@ class FederationRemoteSendQueue(object):
         failures = set((k, self.failures[k]) for k in keys[i:])
 
         for (pos, (destination, failure)) in failures:
-            rows.append((pos, destination, "f", failure))
+            rows.append((pos, None, FAILURE_TYPE, ujson.dumps({
+                "destination": destination,
+                "failure": failure,
+            })))
 
         # Sort rows based on pos
         rows.sort()
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 5a14c51d23..a77312ae34 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -44,6 +44,7 @@ STREAM_NAMES = (
     ("caches",),
     ("to_device",),
     ("public_rooms",),
+    ("federation",),
 )
 
 
@@ -116,8 +117,10 @@ class ReplicationResource(Resource):
         self.sources = hs.get_event_sources()
         self.presence_handler = hs.get_presence_handler()
         self.typing_handler = hs.get_typing_handler()
+        self.federation_sender = hs.get_federation_sender()
         self.notifier = hs.notifier
         self.clock = hs.get_clock()
+        self.config = hs.get_config()
 
         self.putChild("remove_pushers", PusherResource(hs))
         self.putChild("syncing_users", PresenceResource(hs))
@@ -134,6 +137,7 @@ class ReplicationResource(Resource):
         pushers_token = self.store.get_pushers_stream_token()
         caches_token = self.store.get_cache_stream_token()
         public_rooms_token = self.store.get_current_public_room_stream_id()
+        federation_token = self.federation_sender.get_current_token()
 
         defer.returnValue(_ReplicationToken(
             room_stream_token,
@@ -148,6 +152,7 @@ class ReplicationResource(Resource):
             caches_token,
             int(stream_token.to_device_key),
             int(public_rooms_token),
+            int(federation_token),
         ))
 
     @request_handler()
@@ -202,6 +207,7 @@ class ReplicationResource(Resource):
         yield self.caches(writer, current_token, limit, request_streams)
         yield self.to_device(writer, current_token, limit, request_streams)
         yield self.public_rooms(writer, current_token, limit, request_streams)
+        self.federation(writer, current_token, limit, request_streams)
         self.streams(writer, current_token, request_streams)
 
         logger.debug("Replicated %d rows", writer.total)
@@ -465,6 +471,23 @@ class ReplicationResource(Resource):
                 "position", "room_id", "visibility"
             ), position=upto_token)
 
+    def federation(self, writer, current_token, limit, request_streams):
+        if self.config.send_federation:
+            return
+
+        current_position = current_token.federation
+
+        federation = request_streams.get("federation")
+
+        if federation is not None and federation != current_position:
+            federation_rows = self.federation_sender.get_replication_rows(
+                federation, limit,
+            )
+            upto_token = _position_from_rows(federation_rows, current_position)
+            writer.write_header_and_rows("federation", federation_rows, (
+                "position", "type", "content",
+            ), position=upto_token)
+
 
 class _Writer(object):
     """Writes the streams as a JSON object as the response to the request"""
@@ -497,6 +520,7 @@ class _Writer(object):
 class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
     "events", "presence", "typing", "receipts", "account_data", "backfill",
     "push_rules", "pushers", "state", "caches", "to_device", "public_rooms",
+    "federation",
 ))):
     __slots__ = []
 
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 3bfd5e8213..373212d42d 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -29,9 +29,14 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
             "DeviceInboxStreamChangeCache",
             self._device_inbox_id_gen.get_current_token()
         )
+        self._device_federation_outbox_stream_cache = StreamChangeCache(
+            "DeviceFederationOutboxStreamChangeCache",
+            self._device_inbox_id_gen.get_current_token()
+        )
 
     get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
     get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
+    get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
     delete_messages_for_device = DataStore.delete_messages_for_device.__func__
 
     def stream_positions(self):
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index 6f2ba98af5..c459301b76 100644
--- a/synapse/replication/slave/storage/transactions.py
+++ b/synapse/replication/slave/storage/transactions.py
@@ -25,6 +25,9 @@ class TransactionStore(BaseSlavedStore):
     ].orig
     _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
 
+    def prep_send_transaction(self, transaction_id, destination, origin_server_ts):
+        return []
+
     # For now, don't record the destination rety timings
     def set_destination_retry_timings(*args, **kwargs):
         return defer.succeed(None)
diff --git a/synapse/server.py b/synapse/server.py
index faab617b4f..6c57ab3e18 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -32,6 +32,7 @@ from synapse.appservice.scheduler import ApplicationServiceScheduler
 from synapse.crypto.keyring import Keyring
 from synapse.events.builder import EventBuilderFactory
 from synapse.federation import initialize_http_replication
+from synapse.federation.send_queue import FederationRemoteSendQueue
 from synapse.federation.transport.client import TransportLayerClient
 from synapse.federation.transaction_queue import TransactionQueue
 from synapse.handlers import Handlers
@@ -273,7 +274,10 @@ class HomeServer(object):
         return TransportLayerClient(self)
 
     def build_federation_sender(self):
-        return TransactionQueue(self)
+        if self.config.send_federation:
+            return TransactionQueue(self)
+        else:
+            return FederationRemoteSendQueue(self)
 
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 21d0696640..7460f98a1f 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -37,6 +37,13 @@ class UserPresenceState(namedtuple("UserPresenceState",
     status_msg (str): User set status message.
     """
 
+    def as_dict(self):
+        return dict(self._asdict())
+
+    @staticmethod
+    def from_dict(d):
+        return UserPresenceState(**d)
+
     def copy_and_replace(self, **kwargs):
         return self._replace(**kwargs)