diff options
author | Amber H. Brown <hawkowl@atleastfornow.net> | 2020-01-17 01:03:26 +1100 |
---|---|---|
committer | Amber H. Brown <hawkowl@atleastfornow.net> | 2020-01-17 01:03:26 +1100 |
commit | c76a412cc354e632078e3e19579bbafcb7619a51 (patch) | |
tree | f24dc26a76d92c751c620fd46aa2c5c25ca9fafe /synapse/app/federation_sender.py | |
parent | Re-sync every so often, in case caches appear (diff) | |
parent | Add StateMap type alias (#6715) (diff) | |
download | synapse-c76a412cc354e632078e3e19579bbafcb7619a51.tar.xz |
Merge remote-tracking branch 'origin/develop' into hawkowl/cache-config-without-synctl
Diffstat (limited to 'synapse/app/federation_sender.py')
-rw-r--r-- | synapse/app/federation_sender.py | 17 |
1 files changed, 6 insertions, 11 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 139221ad34..a57cf991ac 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -40,7 +40,7 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.streams._base import ReceiptsStream from synapse.server import HomeServer -from synapse.storage.engines import create_engine +from synapse.storage.database import Database from synapse.types import ReadReceipt from synapse.util.async_helpers import Linearizer from synapse.util.httpresourcetree import create_resource_tree @@ -59,8 +59,8 @@ class FederationSenderSlaveStore( SlavedDeviceStore, SlavedPresenceStore, ): - def __init__(self, db_conn, hs): - super(FederationSenderSlaveStore, self).__init__(db_conn, hs) + def __init__(self, database: Database, db_conn, hs): + super(FederationSenderSlaveStore, self).__init__(database, db_conn, hs) # We pull out the current federation stream position now so that we # always have a known value for the federation position in memory so @@ -69,7 +69,7 @@ class FederationSenderSlaveStore( self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) def _get_federation_out_pos(self, db_conn): - sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?" + sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?" sql = self.database_engine.convert_param_style(sql) txn = db_conn.cursor() @@ -145,9 +145,8 @@ class FederationSenderReplicationHandler(ReplicationClientHandler): super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore()) self.send_handler = FederationSenderHandler(hs, self) - @defer.inlineCallbacks - def on_rdata(self, stream_name, token, rows): - yield super(FederationSenderReplicationHandler, self).on_rdata( + async def on_rdata(self, stream_name, token, rows): + await super(FederationSenderReplicationHandler, self).on_rdata( stream_name, token, rows ) self.send_handler.process_replication_rows(stream_name, token, rows) @@ -173,8 +172,6 @@ def start(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts - database_engine = create_engine(config.database_config) - if config.send_federation: sys.stderr.write( "\nThe send_federation must be disabled in the main synapse process" @@ -189,10 +186,8 @@ def start(config_options): ss = FederationSenderServer( config.server_name, - db_config=config.database_config, config=config, version_string="Synapse/" + get_version_string(synapse), - database_engine=database_engine, ) setup_logging(ss, config, use_worker_options=True) |