diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index f3ec2a34ec..739b013d4c 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -21,7 +21,7 @@ from typing import Dict, Iterable, Optional, Set
from typing_extensions import ContextManager
-from twisted.internet import defer, reactor
+from twisted.internet import address, reactor
import synapse
import synapse.events
@@ -37,6 +37,7 @@ from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
+from synapse.config.server import ListenerConfig
from synapse.federation import send_queue
from synapse.federation.transport.server import TransportLayerServer
from synapse.handlers.presence import (
@@ -86,7 +87,6 @@ from synapse.replication.tcp.streams import (
ReceiptsStream,
TagAccountDataStream,
ToDeviceStream,
- TypingStream,
)
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events
@@ -110,6 +110,7 @@ from synapse.rest.client.v1.room import (
RoomSendEventRestServlet,
RoomStateEventRestServlet,
RoomStateRestServlet,
+ RoomTypingRestServlet,
)
from synapse.rest.client.v1.voip import VoipRestServlet
from synapse.rest.client.v2_alpha import groups, sync, user_directory
@@ -122,17 +123,18 @@ from synapse.rest.client.v2_alpha.account_data import (
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet
+from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyApiV2Resource
-from synapse.server import HomeServer
-from synapse.storage.data_stores.main.censor_events import CensorEventsStore
-from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore
-from synapse.storage.data_stores.main.monthly_active_users import (
+from synapse.server import HomeServer, cache_in_self
+from synapse.storage.databases.main.censor_events import CensorEventsStore
+from synapse.storage.databases.main.media_repository import MediaRepositoryStore
+from synapse.storage.databases.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
-from synapse.storage.data_stores.main.presence import UserPresenceState
-from synapse.storage.data_stores.main.search import SearchWorkerStore
-from synapse.storage.data_stores.main.ui_auth import UIAuthWorkerStore
-from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
+from synapse.storage.databases.main.presence import UserPresenceState
+from synapse.storage.databases.main.search import SearchWorkerStore
+from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
+from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
@@ -205,10 +207,30 @@ class KeyUploadServlet(RestServlet):
if body:
# They're actually trying to upload something, proxy to main synapse.
- # Pass through the auth headers, if any, in case the access token
- # is there.
- auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
- headers = {"Authorization": auth_headers}
+
+ # Proxy headers from the original request, such as the auth headers
+ # (in case the access token is there) and the original IP /
+ # User-Agent of the request.
+ headers = {
+ header: request.requestHeaders.getRawHeaders(header, [])
+ for header in (b"Authorization", b"User-Agent")
+ }
+ # Add the previous hop the the X-Forwarded-For header.
+ x_forwarded_for = request.requestHeaders.getRawHeaders(
+ b"X-Forwarded-For", []
+ )
+ if isinstance(request.client, (address.IPv4Address, address.IPv6Address)):
+ previous_host = request.client.host.encode("ascii")
+ # If the header exists, add to the comma-separated list of the first
+ # instance of the header. Otherwise, generate a new header.
+ if x_forwarded_for:
+ x_forwarded_for = [
+ x_forwarded_for[0] + b", " + previous_host
+ ] + x_forwarded_for[1:]
+ else:
+ x_forwarded_for = [previous_host]
+ headers[b"X-Forwarded-For"] = x_forwarded_for
+
try:
result = await self.http_client.post_json_get_json(
self.main_uri + request.uri.decode("ascii"), body, headers=headers
@@ -353,9 +375,8 @@ class GenericWorkerPresence(BasePresenceHandler):
return _user_syncing()
- @defer.inlineCallbacks
- def notify_from_replication(self, states, stream_id):
- parties = yield get_interested_parties(self.store, states)
+ async def notify_from_replication(self, states, stream_id):
+ parties = await get_interested_parties(self.store, states)
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
@@ -365,8 +386,7 @@ class GenericWorkerPresence(BasePresenceHandler):
users=users_to_states.keys(),
)
- @defer.inlineCallbacks
- def process_replication_rows(self, token, rows):
+ async def process_replication_rows(self, token, rows):
states = [
UserPresenceState(
row.user_id,
@@ -384,7 +404,7 @@ class GenericWorkerPresence(BasePresenceHandler):
self.user_to_current_state[state.user_id] = state
stream_id = token
- yield self.notify_from_replication(states, stream_id)
+ await self.notify_from_replication(states, stream_id)
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
return [
@@ -430,37 +450,6 @@ class GenericWorkerPresence(BasePresenceHandler):
await self._bump_active_client(user_id=user_id)
-class GenericWorkerTyping(object):
- def __init__(self, hs):
- self._latest_room_serial = 0
- self._reset()
-
- def _reset(self):
- """
- Reset the typing handler's data caches.
- """
- # map room IDs to serial numbers
- self._room_serials = {}
- # map room IDs to sets of users currently typing
- self._room_typing = {}
-
- def process_replication_rows(self, token, rows):
- if self._latest_room_serial > token:
- # The master has gone backwards. To prevent inconsistent data, just
- # clear everything.
- self._reset()
-
- # Set the latest serial token to whatever the server gave us.
- self._latest_room_serial = token
-
- for row in rows:
- self._room_serials[row.room_id] = token
- self._room_typing[row.room_id] = row.user_ids
-
- def get_current_token(self) -> int:
- return self._latest_room_serial
-
-
class GenericWorkerSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
@@ -490,37 +479,27 @@ class GenericWorkerSlavedStore(
SearchWorkerStore,
BaseSlavedStore,
):
- def __init__(self, database, db_conn, hs):
- super(GenericWorkerSlavedStore, self).__init__(database, db_conn, hs)
+ pass
- # We pull out the current federation stream position now so that we
- # always have a known value for the federation position in memory so
- # that we don't have to bounce via a deferred once when we start the
- # replication streams.
- self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
- def _get_federation_out_pos(self, db_conn):
- sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?"
- sql = self.database_engine.convert_param_style(sql)
+class GenericWorkerServer(HomeServer):
+ DATASTORE_CLASS = GenericWorkerSlavedStore
- txn = db_conn.cursor()
- txn.execute(sql, ("federation",))
- rows = txn.fetchall()
- txn.close()
+ def _listen_http(self, listener_config: ListenerConfig):
+ port = listener_config.port
+ bind_addresses = listener_config.bind_addresses
- return rows[0][0] if rows else -1
+ assert listener_config.http_options is not None
+ site_tag = listener_config.http_options.tag
+ if site_tag is None:
+ site_tag = port
-class GenericWorkerServer(HomeServer):
- DATASTORE_CLASS = GenericWorkerSlavedStore
+ # We always include a health resource.
+ resources = {"/health": HealthResource()}
- def _listen_http(self, listener_config):
- port = listener_config["port"]
- bind_addresses = listener_config["bind_addresses"]
- site_tag = listener_config.get("tag", port)
- resources = {}
- for res in listener_config["resources"]:
- for name in res["names"]:
+ for res in listener_config.http_options.resources:
+ for name in res.names:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
@@ -550,6 +529,7 @@ class GenericWorkerServer(HomeServer):
KeyUploadServlet(self).register(resource)
AccountDataServlet(self).register(resource)
RoomAccountDataServlet(self).register(resource)
+ RoomTypingRestServlet(self).register(resource)
sync.register_servlets(self, resource)
events.register_servlets(self, resource)
@@ -590,7 +570,7 @@ class GenericWorkerServer(HomeServer):
" repository is disabled. Ignoring."
)
- if name == "openid" and "federation" not in res["names"]:
+ if name == "openid" and "federation" not in res.names:
# Only load the openid resource separately if federation resource
# is not specified since federation resource includes openid
# resource.
@@ -625,19 +605,19 @@ class GenericWorkerServer(HomeServer):
logger.info("Synapse worker now listening on port %d", port)
- def start_listening(self, listeners):
+ def start_listening(self, listeners: Iterable[ListenerConfig]):
for listener in listeners:
- if listener["type"] == "http":
+ if listener.type == "http":
self._listen_http(listener)
- elif listener["type"] == "manhole":
+ elif listener.type == "manhole":
_base.listen_tcp(
- listener["bind_addresses"],
- listener["port"],
+ listener.bind_addresses,
+ listener.port,
manhole(
username="matrix", password="rabbithole", globals={"hs": self}
),
)
- elif listener["type"] == "metrics":
+ elif listener.type == "metrics":
if not self.get_config().enable_metrics:
logger.warning(
(
@@ -646,31 +626,29 @@ class GenericWorkerServer(HomeServer):
)
)
else:
- _base.listen_metrics(listener["bind_addresses"], listener["port"])
+ _base.listen_metrics(listener.bind_addresses, listener.port)
else:
- logger.warning("Unrecognized listener type: %s", listener["type"])
+ logger.warning("Unsupported listener type: %s", listener.type)
self.get_tcp_replication().start_replication(self)
- def remove_pusher(self, app_id, push_key, user_id):
+ async def remove_pusher(self, app_id, push_key, user_id):
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
- def build_replication_data_handler(self):
+ @cache_in_self
+ def get_replication_data_handler(self):
return GenericWorkerReplicationHandler(self)
- def build_presence_handler(self):
+ @cache_in_self
+ def get_presence_handler(self):
return GenericWorkerPresence(self)
- def build_typing_handler(self):
- return GenericWorkerTyping(self)
-
class GenericWorkerReplicationHandler(ReplicationDataHandler):
def __init__(self, hs):
super(GenericWorkerReplicationHandler, self).__init__(hs)
self.store = hs.get_datastore()
- self.typing_handler = hs.get_typing_handler()
self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence
self.notifier = hs.get_notifier()
@@ -707,11 +685,6 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
await self.pusher_pool.on_new_receipts(
token, token, {row.room_id for row in rows}
)
- elif stream_name == TypingStream.NAME:
- self.typing_handler.process_replication_rows(token, rows)
- self.notifier.on_new_event(
- "typing_key", token, rooms=[row.room_id for row in rows]
- )
elif stream_name == ToDeviceStream.NAME:
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
@@ -738,6 +711,11 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
except Exception:
logger.exception("Error processing replication")
+ async def on_position(self, stream_name: str, instance_name: str, token: int):
+ await super().on_position(stream_name, instance_name, token)
+ # Also call on_rdata to ensure that stream positions are properly reset.
+ await self.on_rdata(stream_name, instance_name, token, [])
+
def stop_pusher(self, user_id, app_id, pushkey):
if not self.notify_pushers:
return
@@ -781,19 +759,11 @@ class FederationSenderHandler(object):
self.federation_sender = hs.get_federation_sender()
self._hs = hs
- # if the worker is restarted, we want to pick up where we left off in
- # the replication stream, so load the position from the database.
- #
- # XXX is this actually worthwhile? Whenever the master is restarted, we'll
- # drop some rows anyway (which is mostly fine because we're only dropping
- # typing and presence notifications). If the replication stream is
- # unreliable, why do we do all this hoop-jumping to store the position in the
- # database? See also https://github.com/matrix-org/synapse/issues/7535.
- #
- self.federation_position = self.store.federation_out_pos_startup
+ # Stores the latest position in the federation stream we've gotten up
+ # to. This is always set before we use it.
+ self.federation_position = None
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
- self._last_ack = self.federation_position
def on_start(self):
# There may be some events that are persisted but haven't been sent,
@@ -901,7 +871,6 @@ class FederationSenderHandler(object):
# We ACK this token over replication so that the master can drop
# its in memory queues
self._hs.get_tcp_replication().send_federation_ack(current_position)
- self._last_ack = current_position
except Exception:
logger.exception("Error updating federation stream position")
@@ -929,7 +898,7 @@ def start(config_options):
)
if config.worker_app == "synapse.app.appservice":
- if config.notify_appservices:
+ if config.appservice.notify_appservices:
sys.stderr.write(
"\nThe appservices must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
@@ -939,13 +908,13 @@ def start(config_options):
sys.exit(1)
# Force the appservice to start since they will be disabled in the main config
- config.notify_appservices = True
+ config.appservice.notify_appservices = True
else:
# For other worker types we force this to off.
- config.notify_appservices = False
+ config.appservice.notify_appservices = False
if config.worker_app == "synapse.app.pusher":
- if config.start_pushers:
+ if config.server.start_pushers:
sys.stderr.write(
"\nThe pushers must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
@@ -955,13 +924,13 @@ def start(config_options):
sys.exit(1)
# Force the pushers to start since they will be disabled in the main config
- config.start_pushers = True
+ config.server.start_pushers = True
else:
# For other worker types we force this to off.
- config.start_pushers = False
+ config.server.start_pushers = False
if config.worker_app == "synapse.app.user_dir":
- if config.update_user_directory:
+ if config.server.update_user_directory:
sys.stderr.write(
"\nThe update_user_directory must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
@@ -971,13 +940,13 @@ def start(config_options):
sys.exit(1)
# Force the pushers to start since they will be disabled in the main config
- config.update_user_directory = True
+ config.server.update_user_directory = True
else:
# For other worker types we force this to off.
- config.update_user_directory = False
+ config.server.update_user_directory = False
if config.worker_app == "synapse.app.federation_sender":
- if config.send_federation:
+ if config.worker.send_federation:
sys.stderr.write(
"\nThe send_federation must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
@@ -987,10 +956,10 @@ def start(config_options):
sys.exit(1)
# Force the pushers to start since they will be disabled in the main config
- config.send_federation = True
+ config.worker.send_federation = True
else:
# For other worker types we force this to off.
- config.send_federation = False
+ config.worker.send_federation = False
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
|