From 1124111a12c3ab35f8b68d9031695aec8b2c7c50 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2020 17:15:40 +0100 Subject: Allow censoring of events to happen on workers. (#7492) This is safe as we can now write to cache invalidation stream on workers, and is required for when we move event persistence off master. --- synapse/app/generic_worker.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/app/generic_worker.py') diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 667ad20428..bccb1140b2 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -122,6 +122,7 @@ from synapse.rest.client.v2_alpha.register import RegisterRestServlet from synapse.rest.client.versions import VersionsRestServlet from synapse.rest.key.v2 import KeyApiV2Resource from synapse.server import HomeServer +from synapse.storage.data_stores.main.censor_events import CensorEventsStore from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore from synapse.storage.data_stores.main.monthly_active_users import ( MonthlyActiveUsersWorkerStore, @@ -442,6 +443,7 @@ class GenericWorkerSlavedStore( SlavedGroupServerStore, SlavedAccountDataStore, SlavedPusherStore, + CensorEventsStore, SlavedEventStore, SlavedKeyStore, RoomStore, -- cgit 1.5.1 From 4734a7bbe4d08d68c5f04dd76cd5bcfb4cd9b6be Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2020 14:01:39 +0100 Subject: Move EventStream handling into default ReplicationDataHandler (#7493) This is so that the logic can happen on both master and workers when we move event persistence out. --- changelog.d/7493.misc | 1 + synapse/app/generic_worker.py | 33 ++------------------------------- synapse/replication/tcp/client.py | 37 +++++++++++++++++++++++++++++++++---- synapse/server.py | 2 +- synapse/server.pyi | 3 +++ 5 files changed, 40 insertions(+), 36 deletions(-) create mode 100644 changelog.d/7493.misc (limited to 'synapse/app/generic_worker.py') diff --git a/changelog.d/7493.misc b/changelog.d/7493.misc new file mode 100644 index 0000000000..575c55a99b --- /dev/null +++ b/changelog.d/7493.misc @@ -0,0 +1 @@ +Move EventStream handling into default ReplicationDataHandler. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index bccb1140b2..2e3add7ac5 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -26,7 +26,6 @@ from twisted.web.resource import NoResource import synapse import synapse.events -from synapse.api.constants import EventTypes from synapse.api.errors import HttpResponseException, SynapseError from synapse.api.urls import ( CLIENT_API_PREFIX, @@ -81,11 +80,6 @@ from synapse.replication.tcp.streams import ( ToDeviceStream, TypingStream, ) -from synapse.replication.tcp.streams.events import ( - EventsStream, - EventsStreamEventRow, - EventsStreamRow, -) from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.client.v1 import events from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet @@ -633,7 +627,7 @@ class GenericWorkerServer(HomeServer): class GenericWorkerReplicationHandler(ReplicationDataHandler): def __init__(self, hs): - super(GenericWorkerReplicationHandler, self).__init__(hs.get_datastore()) + super(GenericWorkerReplicationHandler, self).__init__(hs) self.store = hs.get_datastore() self.typing_handler = hs.get_typing_handler() @@ -659,30 +653,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): stream_name, token, rows ) - if stream_name == EventsStream.NAME: - # We shouldn't get multiple rows per token for events stream, so - # we don't need to optimise this for multiple rows. - for row in rows: - if row.type != EventsStreamEventRow.TypeId: - continue - assert isinstance(row, EventsStreamRow) - - event = await self.store.get_event( - row.data.event_id, allow_rejected=True - ) - if event.rejected_reason: - continue - - extra_users = () - if event.type == EventTypes.Member: - extra_users = (event.state_key,) - max_token = self.store.get_room_max_stream_ordering() - self.notifier.on_new_room_event( - event, token, max_token, extra_users - ) - - await self.pusher_pool.on_new_notifications(token, token) - elif stream_name == PushRulesStream.NAME: + if stream_name == PushRulesStream.NAME: self.notifier.on_new_event( "push_rules_key", token, users=[row.user_id for row in rows] ) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 20cb8a654f..28826302f5 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -16,12 +16,17 @@ """ import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Tuple from twisted.internet.protocol import ReconnectingClientFactory -from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.api.constants import EventTypes from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol +from synapse.replication.tcp.streams.events import ( + EventsStream, + EventsStreamEventRow, + EventsStreamRow, +) if TYPE_CHECKING: from synapse.server import HomeServer @@ -83,8 +88,10 @@ class ReplicationDataHandler: to handle updates in additional ways. """ - def __init__(self, store: BaseSlavedStore): - self.store = store + def __init__(self, hs: "HomeServer"): + self.store = hs.get_datastore() + self.pusher_pool = hs.get_pusherpool() + self.notifier = hs.get_notifier() async def on_rdata( self, stream_name: str, instance_name: str, token: int, rows: list @@ -102,6 +109,28 @@ class ReplicationDataHandler: """ self.store.process_replication_rows(stream_name, instance_name, token, rows) + if stream_name == EventsStream.NAME: + # We shouldn't get multiple rows per token for events stream, so + # we don't need to optimise this for multiple rows. + for row in rows: + if row.type != EventsStreamEventRow.TypeId: + continue + assert isinstance(row, EventsStreamRow) + + event = await self.store.get_event( + row.data.event_id, allow_rejected=True + ) + if event.rejected_reason: + continue + + extra_users = () # type: Tuple[str, ...] + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + max_token = self.store.get_room_max_stream_ordering() + self.notifier.on_new_room_event(event, token, max_token, extra_users) + + await self.pusher_pool.on_new_notifications(token, token) + async def on_position(self, stream_name: str, instance_name: str, token: int): self.store.process_replication_rows(stream_name, instance_name, token, []) diff --git a/synapse/server.py b/synapse/server.py index b4aea81e24..c530f1aa1a 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -581,7 +581,7 @@ class HomeServer(object): return ReplicationStreamer(self) def build_replication_data_handler(self): - return ReplicationDataHandler(self.get_datastore()) + return ReplicationDataHandler(self) def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/server.pyi b/synapse/server.pyi index 31a9cc0389..9e7fad7e6e 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -19,6 +19,7 @@ import synapse.handlers.room_member import synapse.handlers.set_password import synapse.http.client import synapse.notifier +import synapse.push.pusherpool import synapse.replication.tcp.client import synapse.replication.tcp.handler import synapse.rest.media.v1.media_repository @@ -133,3 +134,5 @@ class HomeServer(object): pass def get_macaroon_generator(self) -> synapse.handlers.auth.MacaroonGenerator: pass + def get_pusherpool(self) -> synapse.push.pusherpool.PusherPool: + pass -- cgit 1.5.1 From 03aff4c75ed3b0b106ed1395b3d03b1ab9b013a6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2020 17:22:47 +0100 Subject: Add a worker store for search insertion. (#7516) This is required as both event persistence and the background update needs access to this function. It should be perfectly safe for two workers to write to that table at the same time. --- changelog.d/7516.misc | 1 + synapse/app/generic_worker.py | 2 + synapse/storage/data_stores/main/search.py | 96 +++++++++++++++--------------- 3 files changed, 52 insertions(+), 47 deletions(-) create mode 100644 changelog.d/7516.misc (limited to 'synapse/app/generic_worker.py') diff --git a/changelog.d/7516.misc b/changelog.d/7516.misc new file mode 100644 index 0000000000..94b0fd49b2 --- /dev/null +++ b/changelog.d/7516.misc @@ -0,0 +1 @@ +Add a worker store for search insertion, required for moving event persistence off master. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 2e3add7ac5..ab801108ca 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -122,6 +122,7 @@ from synapse.storage.data_stores.main.monthly_active_users import ( MonthlyActiveUsersWorkerStore, ) from synapse.storage.data_stores.main.presence import UserPresenceState +from synapse.storage.data_stores.main.search import SearchWorkerStore from synapse.storage.data_stores.main.ui_auth import UIAuthWorkerStore from synapse.storage.data_stores.main.user_directory import UserDirectoryStore from synapse.types import ReadReceipt @@ -451,6 +452,7 @@ class GenericWorkerSlavedStore( SlavedFilteringStore, MonthlyActiveUsersWorkerStore, MediaRepositoryStore, + SearchWorkerStore, BaseSlavedStore, ): def __init__(self, database, db_conn, hs): diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py index ee75b92344..13f49d8060 100644 --- a/synapse/storage/data_stores/main/search.py +++ b/synapse/storage/data_stores/main/search.py @@ -37,7 +37,55 @@ SearchEntry = namedtuple( ) -class SearchBackgroundUpdateStore(SQLBaseStore): +class SearchWorkerStore(SQLBaseStore): + def store_search_entries_txn(self, txn, entries): + """Add entries to the search table + + Args: + txn (cursor): + entries (iterable[SearchEntry]): + entries to be added to the table + """ + if not self.hs.config.enable_search: + return + if isinstance(self.database_engine, PostgresEngine): + sql = ( + "INSERT INTO event_search" + " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" + " VALUES (?,?,?,to_tsvector('english', ?),?,?)" + ) + + args = ( + ( + entry.event_id, + entry.room_id, + entry.key, + entry.value, + entry.stream_ordering, + entry.origin_server_ts, + ) + for entry in entries + ) + + txn.executemany(sql, args) + + elif isinstance(self.database_engine, Sqlite3Engine): + sql = ( + "INSERT INTO event_search (event_id, room_id, key, value)" + " VALUES (?,?,?,?)" + ) + args = ( + (entry.event_id, entry.room_id, entry.key, entry.value) + for entry in entries + ) + + txn.executemany(sql, args) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + +class SearchBackgroundUpdateStore(SearchWorkerStore): EVENT_SEARCH_UPDATE_NAME = "event_search" EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" @@ -296,52 +344,6 @@ class SearchBackgroundUpdateStore(SQLBaseStore): return num_rows - def store_search_entries_txn(self, txn, entries): - """Add entries to the search table - - Args: - txn (cursor): - entries (iterable[SearchEntry]): - entries to be added to the table - """ - if not self.hs.config.enable_search: - return - if isinstance(self.database_engine, PostgresEngine): - sql = ( - "INSERT INTO event_search" - " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" - " VALUES (?,?,?,to_tsvector('english', ?),?,?)" - ) - - args = ( - ( - entry.event_id, - entry.room_id, - entry.key, - entry.value, - entry.stream_ordering, - entry.origin_server_ts, - ) - for entry in entries - ) - - txn.executemany(sql, args) - - elif isinstance(self.database_engine, Sqlite3Engine): - sql = ( - "INSERT INTO event_search (event_id, room_id, key, value)" - " VALUES (?,?,?,?)" - ) - args = ( - (entry.event_id, entry.room_id, entry.key, entry.value) - for entry in entries - ) - - txn.executemany(sql, args) - else: - # This should be unreachable. - raise Exception("Unrecognized database engine") - class SearchStore(SearchBackgroundUpdateStore): def __init__(self, database: Database, db_conn, hs): -- cgit 1.5.1 From 51055c8c4409e70e8f310fce420b2f2f7f7a257a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2020 12:24:48 +0100 Subject: Allow ReplicationRestResource to be added to workers (#7515) This allows workers to talk to each other over HTTP replication. --- changelog.d/7515.misc | 1 + synapse/app/generic_worker.py | 4 ++++ synapse/replication/http/__init__.py | 13 ++++++++----- 3 files changed, 13 insertions(+), 5 deletions(-) create mode 100644 changelog.d/7515.misc (limited to 'synapse/app/generic_worker.py') diff --git a/changelog.d/7515.misc b/changelog.d/7515.misc new file mode 100644 index 0000000000..48f3044f90 --- /dev/null +++ b/changelog.d/7515.misc @@ -0,0 +1 @@ +Allow `ReplicationRestResource` to be added to workers. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index ab801108ca..506b70443b 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -47,6 +47,7 @@ from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -570,6 +571,9 @@ class GenericWorkerServer(HomeServer): if name in ["keys", "federation"]: resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self) + if name == "replication": + resources[REPLICATION_PREFIX] = ReplicationRestResource(self) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 4613b2538c..a909744e93 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -34,9 +34,12 @@ class ReplicationRestResource(JsonResource): def register_servlets(self, hs): send_event.register_servlets(hs, self) - membership.register_servlets(hs, self) federation.register_servlets(hs, self) - login.register_servlets(hs, self) - register.register_servlets(hs, self) - devices.register_servlets(hs, self) - streams.register_servlets(hs, self) + + # The following can't currently be instantiated on workers. + if hs.config.worker.worker_app is None: + membership.register_servlets(hs, self) + login.register_servlets(hs, self) + register.register_servlets(hs, self) + devices.register_servlets(hs, self) + streams.register_servlets(hs, self) -- cgit 1.5.1 From 0bbbd10513008d30c17eb1d1e7ba1d091fb44ec7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 21 May 2020 14:36:46 +0100 Subject: Stub out GET presence requests in the frontend proxy (#7545) We don't really make any promises about returning accurate presence data when presence is disabled, so we may as well just return a static response, rather than making the master handle a request. --- changelog.d/7545.misc | 1 + synapse/app/generic_worker.py | 21 ++++----------------- 2 files changed, 5 insertions(+), 17 deletions(-) create mode 100644 changelog.d/7545.misc (limited to 'synapse/app/generic_worker.py') diff --git a/changelog.d/7545.misc b/changelog.d/7545.misc new file mode 100644 index 0000000000..177ec883e2 --- /dev/null +++ b/changelog.d/7545.misc @@ -0,0 +1 @@ +Make worker processes return a stubbed-out response to `GET /presence` requests. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 506b70443b..d751c9772b 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -26,7 +26,7 @@ from twisted.web.resource import NoResource import synapse import synapse.events -from synapse.api.errors import HttpResponseException, SynapseError +from synapse.api.errors import SynapseError from synapse.api.urls import ( CLIENT_API_PREFIX, FEDERATION_PREFIX, @@ -137,31 +137,18 @@ logger = logging.getLogger("synapse.app.generic_worker") class PresenceStatusStubServlet(RestServlet): """If presence is disabled this servlet can be used to stub out setting - presence status, while proxying the getters to the master instance. + presence status. """ PATTERNS = client_patterns("/presence/(?P[^/]*)/status") def __init__(self, hs): super(PresenceStatusStubServlet, self).__init__() - self.http_client = hs.get_simple_http_client() self.auth = hs.get_auth() - self.main_uri = hs.config.worker_main_http_uri async def on_GET(self, request, user_id): - # Pass through the auth headers, if any, in case the access token - # is there. - auth_headers = request.requestHeaders.getRawHeaders("Authorization", []) - headers = {"Authorization": auth_headers} - - try: - result = await self.http_client.get_json( - self.main_uri + request.uri.decode("ascii"), headers=headers - ) - except HttpResponseException as e: - raise e.to_synapse_error() - - return 200, result + await self.auth.get_user_by_req(request) + return 200, {"presence": "offline"} async def on_PUT(self, request, user_id): await self.auth.get_user_by_req(request) -- cgit 1.5.1