summary refs log tree commit diff
path: root/synapse/app/federation_sender.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/federation_sender.py')
-rw-r--r--synapse/app/federation_sender.py304
1 files changed, 113 insertions, 191 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 411e47d98d..a08af83a4c 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -13,69 +13,69 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
 from synapse.federation import send_queue
-from synapse.federation.units import Edu
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
-from synapse.storage.presence import UserPresenceState
-from synapse.util.async import sleep
+from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
 
-from synapse import events
-
-from twisted.internet import reactor, defer
-from twisted.web.resource import Resource
-
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-import ujson as json
-
-logger = logging.getLogger("synapse.app.appservice")
+logger = logging.getLogger("synapse.app.federation_sender")
 
 
 class FederationSenderSlaveStore(
     SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
-    SlavedRegistrationStore, SlavedDeviceStore,
+    SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
 ):
-    pass
+    def __init__(self, db_conn, hs):
+        super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
+
+        # We pull out the current federation stream position now so that we
+        # always have a known value for the federation position in memory so
+        # that we don't have to bounce via a deferred once when we start the
+        # replication streams.
+        self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
+
+    def _get_federation_out_pos(self, db_conn):
+        sql = (
+            "SELECT stream_id FROM federation_stream_position"
+            " WHERE type = ?"
+        )
+        sql = self.database_engine.convert_param_style(sql)
 
+        txn = db_conn.cursor()
+        txn.execute(sql, ("federation",))
+        rows = txn.fetchall()
+        txn.close()
+
+        return rows[0][0] if rows else -1
 
-class FederationSenderServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
 
+class FederationSenderServer(HomeServer):
     def setup(self):
         logger.info("Setting up.")
         self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
@@ -91,19 +91,18 @@ class FederationSenderServer(HomeServer):
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(self)
 
-        root_resource = create_resource_tree(resources, Resource())
-
-        for address in bind_addresses:
-            reactor.listenTCP(
-                port,
-                SynapseSite(
-                    "synapse.access.http.%s" % (site_tag,),
-                    site_tag,
-                    listener_config,
-                    root_resource,
-                ),
-                interface=address
+        root_resource = create_resource_tree(resources, NoResource())
+
+        _base.listen_tcp(
+            bind_addresses,
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
             )
+        )
 
         logger.info("Synapse federation_sender now listening on port %d", port)
 
@@ -112,41 +111,39 @@ class FederationSenderServer(HomeServer):
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
-                bind_addresses = listener["bind_addresses"]
-
-                for address in bind_addresses:
-                    reactor.listenTCP(
-                        listener["port"],
-                        manhole(
-                            username="matrix",
-                            password="rabbithole",
-                            globals={"hs": self},
-                        ),
-                        interface=address
+                _base.listen_tcp(
+                    listener["bind_addresses"],
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
                     )
+                )
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
-    @defer.inlineCallbacks
-    def replicate(self):
-        http_client = self.get_simple_http_client()
-        store = self.get_datastore()
-        replication_url = self.config.worker_replication_url
-        send_handler = FederationSenderHandler(self)
-
-        send_handler.on_start()
-
-        while True:
-            try:
-                args = store.stream_positions()
-                args.update((yield send_handler.stream_positions()))
-                args["timeout"] = 30000
-                result = yield http_client.get_json(replication_url, args=args)
-                yield store.process_replication(result)
-                yield send_handler.process_replication(result)
-            except:
-                logger.exception("Error replicating from %r", replication_url)
-                yield sleep(30)
+        self.get_tcp_replication().start_replication(self)
+
+    def build_tcp_replication(self):
+        return FederationSenderReplicationHandler(self)
+
+
+class FederationSenderReplicationHandler(ReplicationClientHandler):
+    def __init__(self, hs):
+        super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
+        self.send_handler = FederationSenderHandler(hs, self)
+
+    def on_rdata(self, stream_name, token, rows):
+        super(FederationSenderReplicationHandler, self).on_rdata(
+            stream_name, token, rows
+        )
+        self.send_handler.process_replication_rows(stream_name, token, rows)
+
+    def get_streams_to_replicate(self):
+        args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate()
+        args.update(self.send_handler.stream_positions())
+        return args
 
 
 def start(config_options):
@@ -160,7 +157,7 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.federation_sender"
 
-    setup_logging(config.worker_log_config, config.worker_log_file)
+    setup_logging(config, use_worker_options=True)
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
@@ -192,42 +189,27 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        with LoggingContext("run"):
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
-        ps.replicate()
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
-
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-federation-sender",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-federation-sender", config)
 
 
 class FederationSenderHandler(object):
     """Processes the replication stream and forwards the appropriate entries
     to the federation sender.
     """
-    def __init__(self, hs):
+    def __init__(self, hs, replication_client):
         self.store = hs.get_datastore()
         self.federation_sender = hs.get_federation_sender()
+        self.replication_client = replication_client
+
+        self.federation_position = self.store.federation_out_pos_startup
+        self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
+
+        self._last_ack = self.federation_position
 
         self._room_serials = {}
         self._room_typing = {}
@@ -239,98 +221,38 @@ class FederationSenderHandler(object):
             self.store.get_room_max_stream_ordering()
         )
 
-    @defer.inlineCallbacks
     def stream_positions(self):
-        stream_id = yield self.store.get_federation_out_pos("federation")
-        defer.returnValue({
-            "federation": stream_id,
-
-            # Ack stuff we've "processed", this should only be called from
-            # one process.
-            "federation_ack": stream_id,
-        })
+        return {"federation": self.federation_position}
 
-    @defer.inlineCallbacks
-    def process_replication(self, result):
+    def process_replication_rows(self, stream_name, token, rows):
         # The federation stream contains things that we want to send out, e.g.
         # presence, typing, etc.
-        fed_stream = result.get("federation")
-        if fed_stream:
-            latest_id = int(fed_stream["position"])
-
-            # The federation stream containis a bunch of different types of
-            # rows that need to be handled differently. We parse the rows, put
-            # them into the appropriate collection and then send them off.
-            presence_to_send = {}
-            keyed_edus = {}
-            edus = {}
-            failures = {}
-            device_destinations = set()
-
-            # Parse the rows in the stream
-            for row in fed_stream["rows"]:
-                position, typ, content_js = row
-                content = json.loads(content_js)
-
-                if typ == send_queue.PRESENCE_TYPE:
-                    destination = content["destination"]
-                    state = UserPresenceState.from_dict(content["state"])
-
-                    presence_to_send.setdefault(destination, []).append(state)
-                elif typ == send_queue.KEYED_EDU_TYPE:
-                    key = content["key"]
-                    edu = Edu(**content["edu"])
-
-                    keyed_edus.setdefault(
-                        edu.destination, {}
-                    )[(edu.destination, tuple(key))] = edu
-                elif typ == send_queue.EDU_TYPE:
-                    edu = Edu(**content)
-
-                    edus.setdefault(edu.destination, []).append(edu)
-                elif typ == send_queue.FAILURE_TYPE:
-                    destination = content["destination"]
-                    failure = content["failure"]
-
-                    failures.setdefault(destination, []).append(failure)
-                elif typ == send_queue.DEVICE_MESSAGE_TYPE:
-                    device_destinations.add(content["destination"])
-                else:
-                    raise Exception("Unrecognised federation type: %r", typ)
-
-            # We've finished collecting, send everything off
-            for destination, states in presence_to_send.items():
-                self.federation_sender.send_presence(destination, states)
-
-            for destination, edu_map in keyed_edus.items():
-                for key, edu in edu_map.items():
-                    self.federation_sender.send_edu(
-                        edu.destination, edu.edu_type, edu.content, key=key,
-                    )
-
-            for destination, edu_list in edus.items():
-                for edu in edu_list:
-                    self.federation_sender.send_edu(
-                        edu.destination, edu.edu_type, edu.content, key=None,
-                    )
+        if stream_name == "federation":
+            send_queue.process_rows_for_federation(self.federation_sender, rows)
+            run_in_background(self.update_token, token)
 
-            for destination, failure_list in failures.items():
-                for failure in failure_list:
-                    self.federation_sender.send_failure(destination, failure)
-
-            for destination in device_destinations:
-                self.federation_sender.send_device_messages(destination)
+        # We also need to poke the federation sender when new events happen
+        elif stream_name == "events":
+            self.federation_sender.notify_new_events(token)
 
-            # Record where we are in the stream.
-            yield self.store.update_federation_out_pos(
-                "federation", latest_id
-            )
+    @defer.inlineCallbacks
+    def update_token(self, token):
+        try:
+            self.federation_position = token
+
+            # We linearize here to ensure we don't have races updating the token
+            with (yield self._fed_position_linearizer.queue(None)):
+                if self._last_ack < self.federation_position:
+                    yield self.store.update_federation_out_pos(
+                        "federation", self.federation_position
+                    )
 
-        # We also need to poke the federation sender when new events happen
-        event_stream = result.get("events")
-        if event_stream:
-            latest_pos = event_stream["position"]
-            self.federation_sender.notify_new_events(latest_pos)
+                    # We ACK this token over replication so that the master can drop
+                    # its in memory queues
+                    self.replication_client.send_federation_ack(self.federation_position)
+                    self._last_ack = self.federation_position
+        except Exception:
+            logger.exception("Error updating federation stream position")
 
 
 if __name__ == '__main__':