diff options
author | Erik Johnston <erik@matrix.org> | 2018-02-06 10:55:40 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-02-07 10:32:32 +0000 |
commit | 8ec2e638be6f9205451d51dc839c94d0dd8999d4 (patch) | |
tree | 9ee2139137705f86b68c5dcac09d2c1a9a8dca1f | |
parent | Add replication http endpoint for event sending (diff) | |
download | synapse-8ec2e638be6f9205451d51dc839c94d0dd8999d4.tar.xz |
Add event_creator worker
Diffstat (limited to '')
-rw-r--r-- | synapse/app/event_creator.py | 170 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 20 | ||||
-rw-r--r-- | synapse/rest/client/v1/room.py | 1 |
3 files changed, 190 insertions, 1 deletions
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py new file mode 100644 index 0000000000..b2ce399258 --- /dev/null +++ b/synapse/app/event_creator.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import sys + +import synapse +from synapse import events +from synapse.app import _base +from synapse.config._base import ConfigError +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.crypto import context_factory +from synapse.http.server import JsonResource +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v1.room import RoomSendEventRestServlet +from synapse.server import HomeServer +from synapse.storage.engines import create_engine +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.versionstring import get_version_string +from twisted.internet import reactor +from twisted.web.resource import Resource + +logger = logging.getLogger("synapse.app.event_creator") + + +class EventCreatorSlavedStore( + SlavedDeviceStore, + SlavedClientIpStore, + SlavedApplicationServiceStore, + SlavedEventStore, + SlavedRegistrationStore, + RoomStore, + BaseSlavedStore, +): + pass + + +class EventCreatorServer(HomeServer): + def setup(self): + logger.info("Setting up.") + self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_addresses = listener_config["bind_addresses"] + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + RoomSendEventRestServlet(self).register(resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + + _base.listen_tcp( + bind_addresses, + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ) + ) + + logger.info("Synapse event creator now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + _base.listen_tcp( + listener["bind_addresses"], + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ) + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return ReplicationClientHandler(self.get_datastore()) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse event creator", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.event_creator" + + assert config.worker_replication_http_port is not None + + setup_logging(config, use_worker_options=True) + + events.USE_FROZEN_DICTS = config.use_frozen_dicts + + database_engine = create_engine(config.database_config) + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ss = EventCreatorServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ss.setup() + ss.get_handlers() + ss.start_listening(config.worker_listeners) + + def start(): + ss.get_state_handler().start_caching() + ss.get_datastore().start_profiling() + + reactor.callWhenRunning(start) + + _base.start_worker_reactor("synapse-event-creator", config) + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 8acb5df0f3..f8c164b48b 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -21,6 +21,7 @@ from synapse.storage.event_push_actions import EventPushActionsStore from synapse.storage.roommember import RoomMemberStore from synapse.storage.state import StateGroupWorkerStore from synapse.storage.stream import StreamStore +from synapse.storage.signatures import SignatureStore from synapse.util.caches.stream_change_cache import StreamChangeCache from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker @@ -170,6 +171,25 @@ class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore): get_federation_out_pos = DataStore.get_federation_out_pos.__func__ update_federation_out_pos = DataStore.update_federation_out_pos.__func__ + get_latest_event_ids_and_hashes_in_room = ( + DataStore.get_latest_event_ids_and_hashes_in_room.__func__ + ) + _get_latest_event_ids_and_hashes_in_room = ( + DataStore._get_latest_event_ids_and_hashes_in_room.__func__ + ) + _get_event_reference_hashes_txn = ( + DataStore._get_event_reference_hashes_txn.__func__ + ) + add_event_hashes = ( + DataStore.add_event_hashes.__func__ + ) + get_event_reference_hashes = ( + SignatureStore.__dict__["get_event_reference_hashes"] + ) + get_event_reference_hash = ( + SignatureStore.__dict__["get_event_reference_hash"] + ) + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index fbb2fc36e4..817fd47842 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -186,7 +186,6 @@ class RoomSendEventRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomSendEventRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() self.event_creation_hander = hs.get_event_creation_handler() def register(self, http_server): |