From 7a86ca69369e527168b0c402724d1d4b839d25e9 Mon Sep 17 00:00:00 2001 From: Rory& Date: Sat, 7 Sep 2024 04:39:53 +0200 Subject: Synapse worker changes --- .../services/matrix/synapse/workers/media-repo.nix | 3 - .../services/matrix/synapse/workers/module.nix | 47 +++++++- .../stream-writers/account_data-stream-writer.nix | 5 +- .../stream-writers/presence-stream-writer.nix | 2 +- .../stream-writers/push_rule-stream-writer.nix | 118 ++++++++++++++++++++ .../stream-writers/push_rules-stream-writer.nix | 118 -------------------- .../stream-writers/receipt-stream-writer.nix | 121 +++++++++++++++++++++ .../stream-writers/receipts-stream-writer.nix | 118 -------------------- .../stream-writers/shared-stream-writer.nix | 9 +- .../stream-writers/to_device-stream-writer.nix | 2 +- .../stream-writers/typing-stream-writer.nix | 2 +- 11 files changed, 296 insertions(+), 249 deletions(-) create mode 100644 host/Rory-nginx/services/matrix/synapse/workers/stream-writers/push_rule-stream-writer.nix delete mode 100644 host/Rory-nginx/services/matrix/synapse/workers/stream-writers/push_rules-stream-writer.nix create mode 100644 host/Rory-nginx/services/matrix/synapse/workers/stream-writers/receipt-stream-writer.nix delete mode 100644 host/Rory-nginx/services/matrix/synapse/workers/stream-writers/receipts-stream-writer.nix (limited to 'host/Rory-nginx/services/matrix/synapse/workers') diff --git a/host/Rory-nginx/services/matrix/synapse/workers/media-repo.nix b/host/Rory-nginx/services/matrix/synapse/workers/media-repo.nix index a361390..e52010c 100644 --- a/host/Rory-nginx/services/matrix/synapse/workers/media-repo.nix +++ b/host/Rory-nginx/services/matrix/synapse/workers/media-repo.nix @@ -80,9 +80,6 @@ in } ); enable_media_repo = true; - max_upload_size = "512M"; - remote_media_download_burst_count = "512G"; - remote_media_download_per_second = "512G"; rc_federation = { window_size = 1; sleep_limit = 1000; diff --git a/host/Rory-nginx/services/matrix/synapse/workers/module.nix b/host/Rory-nginx/services/matrix/synapse/workers/module.nix index 87e014e..a02540c 100644 --- a/host/Rory-nginx/services/matrix/synapse/workers/module.nix +++ b/host/Rory-nginx/services/matrix/synapse/workers/module.nix @@ -14,7 +14,7 @@ in ./single/appservice.nix ./single/background.nix ./single/user-dir.nix - + ./auth.nix ./client-reader.nix ./event-creator.nix @@ -25,7 +25,15 @@ in ./pusher.nix ./sync.nix + ./stream-writers/account_data-stream-writer.nix ./stream-writers/event-stream-writer.nix + ./stream-writers/presence-stream-writer.nix + ./stream-writers/push_rule-stream-writer.nix + ./stream-writers/receipt-stream-writer.nix + ./stream-writers/to_device-stream-writer.nix + ./stream-writers/typing-stream-writer.nix + + ./stream-writers/shared-stream-writer.nix ]; options.services.matrix-synapse = { enableWorkers = lib.mkEnableOption "Enable dedicated workers"; @@ -49,9 +57,11 @@ in typingStreamWriters = mkIntOption "Number of typing stream writers"; toDeviceStreamWriters = mkIntOption "Number of to_device stream writers"; accountDataStreamWriters = mkIntOption "Number of account data stream writers"; - receiptsStreamWriters = mkIntOption "Number of read receipt stream writers"; + receiptStreamWriters = mkIntOption "Number of read receipt stream writers"; presenceStreamWriters = mkIntOption "Number of presence stream writers"; pushRuleStreamWriters = mkIntOption "Number of push rule stream writers"; + + sharedStreamWriters = mkIntOption "Number of shared stream writers"; nginxVirtualHostName = lib.mkOption { type = lib.types.str; @@ -66,6 +76,39 @@ in assertion = cfg.enableWorkers -> cfg.nginxVirtualHostName != null; message = "nginxVirtualHostName must be set when enableWorkers is true"; } + + + # Stream types and count limitations: https://github.com/element-hq/synapse/blob/develop/synapse/config/workers.py#L344 + { + assertion = cfg.typingStreamWriters <= 1; + message = "Only one typing stream writer is supported"; + } + { + assertion = cfg.toDeviceStreamWriters <= 1; + message = "Only one to_device stream writer is supported"; + } + { + assertion = cfg.accountDataStreamWriters <= 1; + message = "Only one account data stream writer is supported"; + } + # This may be outdated in the documentation...? + #{ + # assertion = cfg.receiptStreamWriters <= 1; + # message = "Only one receipt stream writer is supported"; + #} + { + assertion = cfg.presenceStreamWriters <= 1; + message = "Only one presence stream writer is supported"; + } + { + assertion = cfg.pushRuleStreamWriters <= 1; + message = "Only one push rule stream writer is supported"; + } + + { + assertion = cfg.sharedStreamWriters <= 1; + message = "Only one shared stream writer is supported"; + } ]; }; } diff --git a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/account_data-stream-writer.nix b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/account_data-stream-writer.nix index 200e7c9..dff6e36 100644 --- a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/account_data-stream-writer.nix +++ b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/account_data-stream-writer.nix @@ -7,7 +7,10 @@ let workers = lib.range 0 (cfg.accountDataStreamWriters - 1); workerName = "account_data_stream_writer"; workerRoutes = { - client = [ ]; + client = [ + "^/_matrix/client/(r0|v3|unstable)/.*/tags" + "^/_matrix/client/(r0|v3|unstable)/.*/account_data" + ]; federation = [ ]; media = [ ]; }; diff --git a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/presence-stream-writer.nix b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/presence-stream-writer.nix index 84da90d..69d4813 100644 --- a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/presence-stream-writer.nix +++ b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/presence-stream-writer.nix @@ -7,7 +7,7 @@ let workers = lib.range 0 (cfg.presenceStreamWriters - 1); workerName = "presence_stream_writer"; workerRoutes = { - client = [ ]; + client = [ "^/_matrix/client/(api/v1|r0|v3|unstable)/presence/" ]; federation = [ ]; media = [ ]; }; diff --git a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/push_rule-stream-writer.nix b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/push_rule-stream-writer.nix new file mode 100644 index 0000000..fbd0327 --- /dev/null +++ b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/push_rule-stream-writer.nix @@ -0,0 +1,118 @@ +{ config, lib, ... }: + +let + cfg = config.services.matrix-synapse; + dbGroup = "medium"; + streamWriterType = "push_rules"; + workers = lib.range 0 (cfg.pushRuleStreamWriters - 1); + workerName = "push_rule_stream_writer"; + workerRoutes = { + client = [ "^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/" ]; + federation = [ ]; + media = [ ]; + }; +in +let + enabledResources = + lib.optionals (lib.length workerRoutes.client > 0) [ "client" ] + ++ lib.optionals (lib.length workerRoutes.federation > 0) [ "federation" ] + ++ lib.optionals (lib.length workerRoutes.media > 0) [ "media" ]; +in +{ + config = lib.mkIf (cfg.pushRuleStreamWriters > 0) { + monitoring.synapse.workerNames = lib.map (index: "${workerName}-${toString index}") workers; + services.matrix-synapse = { + settings = { + instance_map = lib.listToAttrs ( + lib.map (index: { + name = "${workerName}-${toString index}"; + value = { + path = "/run/matrix-synapse/${workerName}-${toString index}.sock"; + }; + }) workers + ); + + stream_writers.${streamWriterType} = lib.map (index: "${workerName}-${toString index}") workers; + }; + + workers = lib.listToAttrs ( + lib.map (index: { + name = "${workerName}-${toString index}"; + value = { + worker_app = "synapse.app.generic_worker"; + worker_listeners = + [ + { + type = "http"; + path = "/run/matrix-synapse/${workerName}-${toString index}.sock"; + resources = [ + { + names = [ "replication" ]; + compress = false; + } + ]; + } + ] + ++ lib.map (type: { + type = "http"; + path = "/run/matrix-synapse/${workerName}-${type}-${toString index}.sock"; + mode = "666"; + resources = [ + { + names = [ type ]; + compress = false; + } + ]; + }) enabledResources; + database = ( + import ../../db.nix { + inherit dbGroup; + workerName = "${workerName}-${toString index}"; + } + ); + }; + }) workers + ); + }; + + services.nginx = { + upstreams = lib.listToAttrs ( + lib.map (type: { + name = "${workerName}-${type}"; + value = { + extraConfig = '' + keepalive 32; + least_conn; + ''; + servers = lib.listToAttrs ( + lib.map (index: { + name = "unix:/run/matrix-synapse/${workerName}-${type}-${toString index}.sock"; + value = { + max_fails = 0; + }; + }) workers + ); + }; + }) enabledResources + ); + + virtualHosts."${cfg.nginxVirtualHostName}".locations = lib.listToAttrs ( + lib.flatten ( + lib.forEach enabledResources ( + type: + lib.map (route: { + name = route; + value = { + proxyPass = "http://${workerName}-${type}"; + extraConfig = '' + proxy_http_version 1.1; + proxy_set_header Connection ""; + ''; + }; + }) workerRoutes.${type} + ) + ) + ); + }; + }; +} diff --git a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/push_rules-stream-writer.nix b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/push_rules-stream-writer.nix deleted file mode 100644 index f4a6acc..0000000 --- a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/push_rules-stream-writer.nix +++ /dev/null @@ -1,118 +0,0 @@ -{ config, lib, ... }: - -let - cfg = config.services.matrix-synapse; - dbGroup = "medium"; - streamWriterType = "push_rules"; - workers = lib.range 0 (cfg.pushRuleStreamWriters - 1); - workerName = "push_rule_stream_writer"; - workerRoutes = { - client = [ ]; - federation = [ ]; - media = [ ]; - }; -in -let - enabledResources = - lib.optionals (lib.length workerRoutes.client > 0) [ "client" ] - ++ lib.optionals (lib.length workerRoutes.federation > 0) [ "federation" ] - ++ lib.optionals (lib.length workerRoutes.media > 0) [ "media" ]; -in -{ - config = lib.mkIf (cfg.pushRuleStreamWriters > 0) { - monitoring.synapse.workerNames = lib.map (index: "${workerName}-${toString index}") workers; - services.matrix-synapse = { - settings = { - instance_map = lib.listToAttrs ( - lib.map (index: { - name = "${workerName}-${toString index}"; - value = { - path = "/run/matrix-synapse/${workerName}-${toString index}.sock"; - }; - }) workers - ); - - stream_writers.${streamWriterType} = lib.map (index: "${workerName}-${toString index}") workers; - }; - - workers = lib.listToAttrs ( - lib.map (index: { - name = "${workerName}-${toString index}"; - value = { - worker_app = "synapse.app.generic_worker"; - worker_listeners = - [ - { - type = "http"; - path = "/run/matrix-synapse/${workerName}-${toString index}.sock"; - resources = [ - { - names = [ "replication" ]; - compress = false; - } - ]; - } - ] - ++ lib.map (type: { - type = "http"; - path = "/run/matrix-synapse/${workerName}-${type}-${toString index}.sock"; - mode = "666"; - resources = [ - { - names = [ type ]; - compress = false; - } - ]; - }) enabledResources; - database = ( - import ../../db.nix { - inherit dbGroup; - workerName = "${workerName}-${toString index}"; - } - ); - }; - }) workers - ); - }; - - services.nginx = { - upstreams = lib.listToAttrs ( - lib.map (type: { - name = "${workerName}-${type}"; - value = { - extraConfig = '' - keepalive 32; - least_conn; - ''; - servers = lib.listToAttrs ( - lib.map (index: { - name = "unix:/run/matrix-synapse/${workerName}-${type}-${toString index}.sock"; - value = { - max_fails = 0; - }; - }) workers - ); - }; - }) enabledResources - ); - - virtualHosts."${cfg.nginxVirtualHostName}".locations = lib.listToAttrs ( - lib.flatten ( - lib.forEach enabledResources ( - type: - lib.map (route: { - name = route; - value = { - proxyPass = "http://${workerName}-${type}"; - extraConfig = '' - proxy_http_version 1.1; - proxy_set_header Connection ""; - ''; - }; - }) workerRoutes.${type} - ) - ) - ); - }; - }; -} diff --git a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/receipt-stream-writer.nix b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/receipt-stream-writer.nix new file mode 100644 index 0000000..da4e3a2 --- /dev/null +++ b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/receipt-stream-writer.nix @@ -0,0 +1,121 @@ +{ config, lib, ... }: + +let + cfg = config.services.matrix-synapse; + dbGroup = "medium"; + streamWriterType = "receipts"; + workers = lib.range 0 (cfg.receiptStreamWriters - 1); + workerName = "receipts_stream_writer"; + workerRoutes = { + client = [ + "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt" + "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers" + ]; + federation = [ ]; + media = [ ]; + }; +in +let + enabledResources = + lib.optionals (lib.length workerRoutes.client > 0) [ "client" ] + ++ lib.optionals (lib.length workerRoutes.federation > 0) [ "federation" ] + ++ lib.optionals (lib.length workerRoutes.media > 0) [ "media" ]; +in +{ + config = lib.mkIf (cfg.receiptStreamWriters > 0) { + monitoring.synapse.workerNames = lib.map (index: "${workerName}-${toString index}") workers; + services.matrix-synapse = { + settings = { + instance_map = lib.listToAttrs ( + lib.map (index: { + name = "${workerName}-${toString index}"; + value = { + path = "/run/matrix-synapse/${workerName}-${toString index}.sock"; + }; + }) workers + ); + + stream_writers.${streamWriterType} = lib.map (index: "${workerName}-${toString index}") workers; + }; + + workers = lib.listToAttrs ( + lib.map (index: { + name = "${workerName}-${toString index}"; + value = { + worker_app = "synapse.app.generic_worker"; + worker_listeners = + [ + { + type = "http"; + path = "/run/matrix-synapse/${workerName}-${toString index}.sock"; + resources = [ + { + names = [ "replication" ]; + compress = false; + } + ]; + } + ] + ++ lib.map (type: { + type = "http"; + path = "/run/matrix-synapse/${workerName}-${type}-${toString index}.sock"; + mode = "666"; + resources = [ + { + names = [ type ]; + compress = false; + } + ]; + }) enabledResources; + database = ( + import ../../db.nix { + inherit dbGroup; + workerName = "${workerName}-${toString index}"; + } + ); + }; + }) workers + ); + }; + + services.nginx = { + upstreams = lib.listToAttrs ( + lib.map (type: { + name = "${workerName}-${type}"; + value = { + extraConfig = '' + keepalive 32; + least_conn; + ''; + servers = lib.listToAttrs ( + lib.map (index: { + name = "unix:/run/matrix-synapse/${workerName}-${type}-${toString index}.sock"; + value = { + max_fails = 0; + }; + }) workers + ); + }; + }) enabledResources + ); + + virtualHosts."${cfg.nginxVirtualHostName}".locations = lib.listToAttrs ( + lib.flatten ( + lib.forEach enabledResources ( + type: + lib.map (route: { + name = route; + value = { + proxyPass = "http://${workerName}-${type}"; + extraConfig = '' + proxy_http_version 1.1; + proxy_set_header Connection ""; + ''; + }; + }) workerRoutes.${type} + ) + ) + ); + }; + }; +} diff --git a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/receipts-stream-writer.nix b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/receipts-stream-writer.nix deleted file mode 100644 index 91583d9..0000000 --- a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/receipts-stream-writer.nix +++ /dev/null @@ -1,118 +0,0 @@ -{ config, lib, ... }: - -let - cfg = config.services.matrix-synapse; - dbGroup = "medium"; - streamWriterType = "receipts"; - workers = lib.range 0 (cfg.receiptStreamWriters - 1); - workerName = "receipts_stream_writer"; - workerRoutes = { - client = [ ]; - federation = [ ]; - media = [ ]; - }; -in -let - enabledResources = - lib.optionals (lib.length workerRoutes.client > 0) [ "client" ] - ++ lib.optionals (lib.length workerRoutes.federation > 0) [ "federation" ] - ++ lib.optionals (lib.length workerRoutes.media > 0) [ "media" ]; -in -{ - config = lib.mkIf (cfg.receiptStreamWriters > 0) { - monitoring.synapse.workerNames = lib.map (index: "${workerName}-${toString index}") workers; - services.matrix-synapse = { - settings = { - instance_map = lib.listToAttrs ( - lib.map (index: { - name = "${workerName}-${toString index}"; - value = { - path = "/run/matrix-synapse/${workerName}-${toString index}.sock"; - }; - }) workers - ); - - stream_writers.${streamWriterType} = lib.map (index: "${workerName}-${toString index}") workers; - }; - - workers = lib.listToAttrs ( - lib.map (index: { - name = "${workerName}-${toString index}"; - value = { - worker_app = "synapse.app.generic_worker"; - worker_listeners = - [ - { - type = "http"; - path = "/run/matrix-synapse/${workerName}-${toString index}.sock"; - resources = [ - { - names = [ "replication" ]; - compress = false; - } - ]; - } - ] - ++ lib.map (type: { - type = "http"; - path = "/run/matrix-synapse/${workerName}-${type}-${toString index}.sock"; - mode = "666"; - resources = [ - { - names = [ type ]; - compress = false; - } - ]; - }) enabledResources; - database = ( - import ../../db.nix { - inherit dbGroup; - workerName = "${workerName}-${toString index}"; - } - ); - }; - }) workers - ); - }; - - services.nginx = { - upstreams = lib.listToAttrs ( - lib.map (type: { - name = "${workerName}-${type}"; - value = { - extraConfig = '' - keepalive 32; - least_conn; - ''; - servers = lib.listToAttrs ( - lib.map (index: { - name = "unix:/run/matrix-synapse/${workerName}-${type}-${toString index}.sock"; - value = { - max_fails = 0; - }; - }) workers - ); - }; - }) enabledResources - ); - - virtualHosts."${cfg.nginxVirtualHostName}".locations = lib.listToAttrs ( - lib.flatten ( - lib.forEach enabledResources ( - type: - lib.map (route: { - name = route; - value = { - proxyPass = "http://${workerName}-${type}"; - extraConfig = '' - proxy_http_version 1.1; - proxy_set_header Connection ""; - ''; - }; - }) workerRoutes.${type} - ) - ) - ); - }; - }; -} diff --git a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/shared-stream-writer.nix b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/shared-stream-writer.nix index 3da4276..5fd0bd0 100644 --- a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/shared-stream-writer.nix +++ b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/shared-stream-writer.nix @@ -19,7 +19,7 @@ let ++ lib.optionals (lib.length workerRoutes.media > 0) [ "media" ]; in { - config = lib.mkIf (cfg.presenceStreamWriters > 0) { + config = lib.mkIf (cfg.sharedStreamWriters > 0) { monitoring.synapse.workerNames = lib.map (index: "${workerName}-${toString index}") workers; services.matrix-synapse = { settings = { @@ -34,10 +34,11 @@ in stream_writers.account_data = lib.map (index: "${workerName}-${toString index}") workers; stream_writers.events = lib.map (index: "${workerName}-${toString index}") workers; - stream_writers.typing = lib.map (index: "${workerName}-${toString index}") workers; - stream_writers.to_device = lib.map (index: "${workerName}-${toString index}") workers; - stream_writers.receipts = lib.map (index: "${workerName}-${toString index}") workers; + stream_writers.presence = lib.map (index: "${workerName}-${toString index}") workers; stream_writers.push_rules = lib.map (index: "${workerName}-${toString index}") workers; + stream_writers.receipts = lib.map (index: "${workerName}-${toString index}") workers; + stream_writers.to_device = lib.map (index: "${workerName}-${toString index}") workers; + stream_writers.typing = lib.map (index: "${workerName}-${toString index}") workers; }; workers = lib.listToAttrs ( diff --git a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/to_device-stream-writer.nix b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/to_device-stream-writer.nix index e24c8a4..47c2c0a 100644 --- a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/to_device-stream-writer.nix +++ b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/to_device-stream-writer.nix @@ -7,7 +7,7 @@ let workers = lib.range 0 (cfg.toDeviceStreamWriters - 1); workerName = "to_device_stream_writer"; workerRoutes = { - client = [ ]; + client = [ "^/_matrix/client/(r0|v3|unstable)/sendToDevice/" ]; federation = [ ]; media = [ ]; }; diff --git a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/typing-stream-writer.nix b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/typing-stream-writer.nix index 80e79a9..3986619 100644 --- a/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/typing-stream-writer.nix +++ b/host/Rory-nginx/services/matrix/synapse/workers/stream-writers/typing-stream-writer.nix @@ -7,7 +7,7 @@ let workers = lib.range 0 (cfg.typingStreamWriters - 1); workerName = "typing_stream_writer"; workerRoutes = { - client = [ ]; + client = [ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing" ]; federation = [ ]; media = [ ]; }; -- cgit 1.4.1