summary refs log tree commit diff
path: root/synapse/app/generic_worker.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-07-10 18:26:36 +0100
committerGitHub <noreply@github.com>2020-07-10 18:26:36 +0100
commitf299441cc67f31dcd47b8fdfda4a218bee9df9ba (patch)
treebba78ca419a547249491c81f3c9968cf526c13b1 /synapse/app/generic_worker.py
parentFix resync remote devices on receive PDU in worker mode. (#7815) (diff)
downloadsynapse-f299441cc67f31dcd47b8fdfda4a218bee9df9ba.tar.xz
Add ability to shard the federation sender (#7798)
Diffstat (limited to 'synapse/app/generic_worker.py')
-rw-r--r--synapse/app/generic_worker.py59
1 files changed, 16 insertions, 43 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index f6792d9fc8..e90695f026 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -511,25 +511,7 @@ class GenericWorkerSlavedStore(
     SearchWorkerStore,
     BaseSlavedStore,
 ):
-    def __init__(self, database, db_conn, hs):
-        super(GenericWorkerSlavedStore, self).__init__(database, db_conn, hs)
-
-        # We pull out the current federation stream position now so that we
-        # always have a known value for the federation position in memory so
-        # that we don't have to bounce via a deferred once when we start the
-        # replication streams.
-        self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
-
-    def _get_federation_out_pos(self, db_conn):
-        sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?"
-        sql = self.database_engine.convert_param_style(sql)
-
-        txn = db_conn.cursor()
-        txn.execute(sql, ("federation",))
-        rows = txn.fetchall()
-        txn.close()
-
-        return rows[0][0] if rows else -1
+    pass
 
 
 class GenericWorkerServer(HomeServer):
@@ -812,19 +794,11 @@ class FederationSenderHandler(object):
         self.federation_sender = hs.get_federation_sender()
         self._hs = hs
 
-        # if the worker is restarted, we want to pick up where we left off in
-        # the replication stream, so load the position from the database.
-        #
-        # XXX is this actually worthwhile? Whenever the master is restarted, we'll
-        # drop some rows anyway (which is mostly fine because we're only dropping
-        # typing and presence notifications). If the replication stream is
-        # unreliable, why do we do all this hoop-jumping to store the position in the
-        # database? See also https://github.com/matrix-org/synapse/issues/7535.
-        #
-        self.federation_position = self.store.federation_out_pos_startup
+        # Stores the latest position in the federation stream we've gotten up
+        # to. This is always set before we use it.
+        self.federation_position = None
 
         self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
-        self._last_ack = self.federation_position
 
     def on_start(self):
         # There may be some events that are persisted but haven't been sent,
@@ -932,7 +906,6 @@ class FederationSenderHandler(object):
                 # We ACK this token over replication so that the master can drop
                 # its in memory queues
                 self._hs.get_tcp_replication().send_federation_ack(current_position)
-                self._last_ack = current_position
         except Exception:
             logger.exception("Error updating federation stream position")
 
@@ -960,7 +933,7 @@ def start(config_options):
     )
 
     if config.worker_app == "synapse.app.appservice":
-        if config.notify_appservices:
+        if config.appservice.notify_appservices:
             sys.stderr.write(
                 "\nThe appservices must be disabled in the main synapse process"
                 "\nbefore they can be run in a separate worker."
@@ -970,13 +943,13 @@ def start(config_options):
             sys.exit(1)
 
         # Force the appservice to start since they will be disabled in the main config
-        config.notify_appservices = True
+        config.appservice.notify_appservices = True
     else:
         # For other worker types we force this to off.
-        config.notify_appservices = False
+        config.appservice.notify_appservices = False
 
     if config.worker_app == "synapse.app.pusher":
-        if config.start_pushers:
+        if config.server.start_pushers:
             sys.stderr.write(
                 "\nThe pushers must be disabled in the main synapse process"
                 "\nbefore they can be run in a separate worker."
@@ -986,13 +959,13 @@ def start(config_options):
             sys.exit(1)
 
         # Force the pushers to start since they will be disabled in the main config
-        config.start_pushers = True
+        config.server.start_pushers = True
     else:
         # For other worker types we force this to off.
-        config.start_pushers = False
+        config.server.start_pushers = False
 
     if config.worker_app == "synapse.app.user_dir":
-        if config.update_user_directory:
+        if config.server.update_user_directory:
             sys.stderr.write(
                 "\nThe update_user_directory must be disabled in the main synapse process"
                 "\nbefore they can be run in a separate worker."
@@ -1002,13 +975,13 @@ def start(config_options):
             sys.exit(1)
 
         # Force the pushers to start since they will be disabled in the main config
-        config.update_user_directory = True
+        config.server.update_user_directory = True
     else:
         # For other worker types we force this to off.
-        config.update_user_directory = False
+        config.server.update_user_directory = False
 
     if config.worker_app == "synapse.app.federation_sender":
-        if config.send_federation:
+        if config.federation.send_federation:
             sys.stderr.write(
                 "\nThe send_federation must be disabled in the main synapse process"
                 "\nbefore they can be run in a separate worker."
@@ -1018,10 +991,10 @@ def start(config_options):
             sys.exit(1)
 
         # Force the pushers to start since they will be disabled in the main config
-        config.send_federation = True
+        config.federation.send_federation = True
     else:
         # For other worker types we force this to off.
-        config.send_federation = False
+        config.federation.send_federation = False
 
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts