diff options
Diffstat (limited to '')
-rwxr-xr-x | synapse/app/homeserver.py | 161 | ||||
-rwxr-xr-x | synapse/app/synctl.py | 2 | ||||
-rw-r--r-- | synapse/appservice/__init__.py | 176 | ||||
-rw-r--r-- | synapse/appservice/api.py | 108 |
4 files changed, 418 insertions, 29 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 9b23c58abe..dff08c8bc5 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -14,7 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage import prepare_database, UpgradeDatabaseException +import sys +sys.dont_write_bytecode = True + +from synapse.storage import ( + prepare_database, prepare_sqlite3_database, UpgradeDatabaseException, +) from synapse.server import HomeServer @@ -27,17 +32,21 @@ from twisted.web.resource import Resource from twisted.web.static import File from twisted.web.server import Site from synapse.http.server import JsonResource, RootRedirect -from synapse.media.v0.content_repository import ContentRepoResource -from synapse.media.v1.media_repository import MediaRepositoryResource +from synapse.rest.appservice.v1 import AppServiceRestResource +from synapse.rest.media.v0.content_repository import ContentRepoResource +from synapse.rest.media.v1.media_repository import MediaRepositoryResource from synapse.http.server_key_resource import LocalKey from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.api.urls import ( CLIENT_PREFIX, FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX, - SERVER_KEY_PREFIX, MEDIA_PREFIX + SERVER_KEY_PREFIX, MEDIA_PREFIX, CLIENT_V2_ALPHA_PREFIX, APP_SERVICE_PREFIX, + STATIC_PREFIX ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory from synapse.util.logcontext import LoggingContext +from synapse.rest.client.v1 import ClientV1RestResource +from synapse.rest.client.v2_alpha import ClientV2AlphaRestResource from daemonize import Daemonize import twisted.manhole.telnet @@ -47,7 +56,8 @@ import synapse import logging import os import re -import sys +import resource +import subprocess import sqlite3 import syweb @@ -60,16 +70,25 @@ class SynapseHomeServer(HomeServer): return MatrixFederationHttpClient(self) def build_resource_for_client(self): - return JsonResource() + return ClientV1RestResource(self) + + def build_resource_for_client_v2_alpha(self): + return ClientV2AlphaRestResource(self) def build_resource_for_federation(self): - return JsonResource() + return JsonResource(self) + + def build_resource_for_app_services(self): + return AppServiceRestResource(self) def build_resource_for_web_client(self): syweb_path = os.path.dirname(syweb.__file__) webclient_path = os.path.join(syweb_path, "webclient") return File(webclient_path) # TODO configurable? + def build_resource_for_static_content(self): + return File("static") + def build_resource_for_content_repo(self): return ContentRepoResource( self, self.upload_dir, self.auth, self.content_addr @@ -86,7 +105,9 @@ class SynapseHomeServer(HomeServer): "sqlite3", self.get_db_name(), check_same_thread=False, cp_min=1, - cp_max=1 + cp_max=1, + cp_openfun=prepare_database, # Prepare the database for each conn + # so that :memory: sqlite works ) def create_resource_tree(self, web_client, redirect_root_to_web_client): @@ -105,11 +126,15 @@ class SynapseHomeServer(HomeServer): # [ ("/aaa/bbb/cc", Resource1), ("/aaa/dummy", Resource2) ] desired_tree = [ (CLIENT_PREFIX, self.get_resource_for_client()), + (CLIENT_V2_ALPHA_PREFIX, self.get_resource_for_client_v2_alpha()), (FEDERATION_PREFIX, self.get_resource_for_federation()), (CONTENT_REPO_PREFIX, self.get_resource_for_content_repo()), (SERVER_KEY_PREFIX, self.get_resource_for_server_key()), (MEDIA_PREFIX, self.get_resource_for_media_repository()), + (APP_SERVICE_PREFIX, self.get_resource_for_app_services()), + (STATIC_PREFIX, self.get_resource_for_static_content()), ] + if web_client: logger.info("Adding the web client.") desired_tree.append((WEB_CLIENT_PREFIX, @@ -125,11 +150,11 @@ class SynapseHomeServer(HomeServer): # instead, we'll store a copy of this mapping so we can actually add # extra resources to existing nodes. See self._resource_id for the key. resource_mappings = {} - for (full_path, resource) in desired_tree: - logger.info("Attaching %s to path %s", resource, full_path) + for full_path, res in desired_tree: + logger.info("Attaching %s to path %s", res, full_path) last_resource = self.root_resource for path_seg in full_path.split('/')[1:-1]: - if not path_seg in last_resource.listNames(): + if path_seg not in last_resource.listNames(): # resource doesn't exist, so make a "dummy resource" child_resource = Resource() last_resource.putChild(path_seg, child_resource) @@ -157,12 +182,12 @@ class SynapseHomeServer(HomeServer): child_name) child_resource = resource_mappings[child_res_id] # steal the children - resource.putChild(child_name, child_resource) + res.putChild(child_name, child_resource) # finally, insert the desired resource in the right place - last_resource.putChild(last_path_seg, resource) + last_resource.putChild(last_path_seg, res) res_id = self._resource_id(last_resource, last_path_seg) - resource_mappings[res_id] = resource + resource_mappings[res_id] = res return self.root_resource @@ -194,6 +219,83 @@ class SynapseHomeServer(HomeServer): logger.info("Synapse now listening on port %d", unsecure_port) +def get_version_string(): + try: + null = open(os.devnull, 'w') + cwd = os.path.dirname(os.path.abspath(__file__)) + try: + git_branch = subprocess.check_output( + ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], + stderr=null, + cwd=cwd, + ).strip() + git_branch = "b=" + git_branch + except subprocess.CalledProcessError: + git_branch = "" + + try: + git_tag = subprocess.check_output( + ['git', 'describe', '--exact-match'], + stderr=null, + cwd=cwd, + ).strip() + git_tag = "t=" + git_tag + except subprocess.CalledProcessError: + git_tag = "" + + try: + git_commit = subprocess.check_output( + ['git', 'rev-parse', '--short', 'HEAD'], + stderr=null, + cwd=cwd, + ).strip() + except subprocess.CalledProcessError: + git_commit = "" + + try: + dirty_string = "-this_is_a_dirty_checkout" + is_dirty = subprocess.check_output( + ['git', 'describe', '--dirty=' + dirty_string], + stderr=null, + cwd=cwd, + ).strip().endswith(dirty_string) + + git_dirty = "dirty" if is_dirty else "" + except subprocess.CalledProcessError: + git_dirty = "" + + if git_branch or git_tag or git_commit or git_dirty: + git_version = ",".join( + s for s in + (git_branch, git_tag, git_commit, git_dirty,) + if s + ) + + return ( + "Synapse/%s (%s)" % ( + synapse.__version__, git_version, + ) + ).encode("ascii") + except Exception as e: + logger.warn("Failed to check for git repository: %s", e) + + return ("Synapse/%s" % (synapse.__version__,)).encode("ascii") + + +def change_resource_limit(soft_file_no): + try: + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + + if not soft_file_no: + soft_file_no = hard + + resource.setrlimit(resource.RLIMIT_NOFILE, (soft_file_no, hard)) + + logger.info("Set file limit to: %d", soft_file_no) + except (ValueError, resource.error) as e: + logger.warn("Failed to set file limit: %s", e) + + def setup(config_options, should_run=True): config = HomeServerConfig.load_config( "Synapse Homeserver", @@ -205,8 +307,10 @@ def setup(config_options, should_run=True): check_requirements() + version_string = get_version_string() + logger.info("Server hostname: %s", config.server_name) - logger.info("Server version: %s", synapse.__version__) + logger.info("Server version: %s", version_string) if re.search(":[0-9]+$", config.server_name): domain_with_port = config.server_name @@ -223,10 +327,9 @@ def setup(config_options, should_run=True): tls_context_factory=tls_context_factory, config=config, content_addr=config.content_addr, + version_string=version_string, ) - hs.register_servlets() - hs.create_resource_tree( web_client=config.webclient, redirect_root_to_web_client=True, @@ -238,6 +341,7 @@ def setup(config_options, should_run=True): try: with sqlite3.connect(db_name) as db_conn: + prepare_sqlite3_database(db_conn) prepare_database(db_conn) except UpgradeDatabaseException: sys.stderr.write( @@ -249,14 +353,6 @@ def setup(config_options, should_run=True): logger.info("Database prepared in %s.", db_name) - db_pool = hs.get_db_pool() - - if db_name == ":memory:": - # Memory databases will need to be setup each time they are opened. - reactor.callWhenRunning( - db_pool.runWithConnection, prepare_database - ) - if config.manhole: f = twisted.manhole.telnet.ShellFactory() f.username = "matrix" @@ -267,17 +363,24 @@ def setup(config_options, should_run=True): bind_port = config.bind_port if config.no_tls: bind_port = None + hs.start_listening(bind_port, config.unsecure_port) + hs.get_pusherpool().start() + hs.get_state_handler().start_caching() + hs.get_datastore().start_profiling() + hs.get_replication_layer().start_get_pdu_cache() + if not should_run: return if config.daemonize: print config.pid_file + daemon = Daemonize( app="synapse-homeserver", pid=config.pid_file, - action=run, + action=lambda: run(config), auto_close_fds=False, verbose=True, logger=logger, @@ -285,7 +388,7 @@ def setup(config_options, should_run=True): daemon.start() else: - reactor.run() + run(config) class SynapseService(service.Service): @@ -299,8 +402,10 @@ class SynapseService(service.Service): return self._port.stopListening() -def run(): +def run(config): with LoggingContext("run"): + change_resource_limit(config.soft_file_limit) + reactor.run() diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 363c20f994..3a70a248dc 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -19,7 +19,7 @@ import os import subprocess import signal -SYNAPSE = ["python", "-m", "synapse.app.homeserver"] +SYNAPSE = ["python", "-B", "-m", "synapse.app.homeserver"] CONFIGFILE = "homeserver.yaml" PIDFILE = "homeserver.pid" diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py new file mode 100644 index 0000000000..a268a6bcc4 --- /dev/null +++ b/synapse/appservice/__init__.py @@ -0,0 +1,176 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.api.constants import EventTypes + +import logging +import re + +logger = logging.getLogger(__name__) + + +class ApplicationService(object): + """Defines an application service. This definition is mostly what is + provided to the /register AS API. + + Provides methods to check if this service is "interested" in events. + """ + NS_USERS = "users" + NS_ALIASES = "aliases" + NS_ROOMS = "rooms" + # The ordering here is important as it is used to map database values (which + # are stored as ints representing the position in this list) to namespace + # values. + NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS] + + def __init__(self, token, url=None, namespaces=None, hs_token=None, + sender=None, txn_id=None): + self.token = token + self.url = url + self.hs_token = hs_token + self.sender = sender + self.namespaces = self._check_namespaces(namespaces) + self.txn_id = txn_id + + def _check_namespaces(self, namespaces): + # Sanity check that it is of the form: + # { + # users: [ {regex: "[A-z]+.*", exclusive: true}, ...], + # aliases: [ {regex: "[A-z]+.*", exclusive: true}, ...], + # rooms: [ {regex: "[A-z]+.*", exclusive: true}, ...], + # } + if not namespaces: + return None + + for ns in ApplicationService.NS_LIST: + if ns not in namespaces: + namespaces[ns] = [] + continue + + if type(namespaces[ns]) != list: + raise ValueError("Bad namespace value for '%s'" % ns) + for regex_obj in namespaces[ns]: + if not isinstance(regex_obj, dict): + raise ValueError("Expected dict regex for ns '%s'" % ns) + if not isinstance(regex_obj.get("exclusive"), bool): + raise ValueError( + "Expected bool for 'exclusive' in ns '%s'" % ns + ) + if not isinstance(regex_obj.get("regex"), basestring): + raise ValueError( + "Expected string for 'regex' in ns '%s'" % ns + ) + return namespaces + + def _matches_regex(self, test_string, namespace_key, return_obj=False): + if not isinstance(test_string, basestring): + logger.error( + "Expected a string to test regex against, but got %s", + test_string + ) + return False + + for regex_obj in self.namespaces[namespace_key]: + if re.match(regex_obj["regex"], test_string): + if return_obj: + return regex_obj + return True + return False + + def _is_exclusive(self, ns_key, test_string): + regex_obj = self._matches_regex(test_string, ns_key, return_obj=True) + if regex_obj: + return regex_obj["exclusive"] + return False + + def _matches_user(self, event, member_list): + if (hasattr(event, "sender") and + self.is_interested_in_user(event.sender)): + return True + # also check m.room.member state key + if (hasattr(event, "type") and event.type == EventTypes.Member + and hasattr(event, "state_key") + and self.is_interested_in_user(event.state_key)): + return True + # check joined member events + for member in member_list: + if self.is_interested_in_user(member.state_key): + return True + return False + + def _matches_room_id(self, event): + if hasattr(event, "room_id"): + return self.is_interested_in_room(event.room_id) + return False + + def _matches_aliases(self, event, alias_list): + for alias in alias_list: + if self.is_interested_in_alias(alias): + return True + return False + + def is_interested(self, event, restrict_to=None, aliases_for_event=None, + member_list=None): + """Check if this service is interested in this event. + + Args: + event(Event): The event to check. + restrict_to(str): The namespace to restrict regex tests to. + aliases_for_event(list): A list of all the known room aliases for + this event. + member_list(list): A list of all joined room members in this room. + Returns: + bool: True if this service would like to know about this event. + """ + if aliases_for_event is None: + aliases_for_event = [] + if member_list is None: + member_list = [] + + if restrict_to and restrict_to not in ApplicationService.NS_LIST: + # this is a programming error, so fail early and raise a general + # exception + raise Exception("Unexpected restrict_to value: %s". restrict_to) + + if not restrict_to: + return (self._matches_user(event, member_list) + or self._matches_aliases(event, aliases_for_event) + or self._matches_room_id(event)) + elif restrict_to == ApplicationService.NS_ALIASES: + return self._matches_aliases(event, aliases_for_event) + elif restrict_to == ApplicationService.NS_ROOMS: + return self._matches_room_id(event) + elif restrict_to == ApplicationService.NS_USERS: + return self._matches_user(event, member_list) + + def is_interested_in_user(self, user_id): + return self._matches_regex(user_id, ApplicationService.NS_USERS) + + def is_interested_in_alias(self, alias): + return self._matches_regex(alias, ApplicationService.NS_ALIASES) + + def is_interested_in_room(self, room_id): + return self._matches_regex(room_id, ApplicationService.NS_ROOMS) + + def is_exclusive_user(self, user_id): + return self._is_exclusive(ApplicationService.NS_USERS, user_id) + + def is_exclusive_alias(self, alias): + return self._is_exclusive(ApplicationService.NS_ALIASES, alias) + + def is_exclusive_room(self, room_id): + return self._is_exclusive(ApplicationService.NS_ROOMS, room_id) + + def __str__(self): + return "ApplicationService: %s" % (self.__dict__,) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py new file mode 100644 index 0000000000..c2179f8d55 --- /dev/null +++ b/synapse/appservice/api.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from twisted.internet import defer + +from synapse.api.errors import CodeMessageException +from synapse.http.client import SimpleHttpClient +from synapse.events.utils import serialize_event + +import logging +import urllib + +logger = logging.getLogger(__name__) + + +class ApplicationServiceApi(SimpleHttpClient): + """This class manages HS -> AS communications, including querying and + pushing. + """ + + def __init__(self, hs): + super(ApplicationServiceApi, self).__init__(hs) + self.clock = hs.get_clock() + + @defer.inlineCallbacks + def query_user(self, service, user_id): + uri = service.url + ("/users/%s" % urllib.quote(user_id)) + response = None + try: + response = yield self.get_json(uri, { + "access_token": service.hs_token + }) + if response is not None: # just an empty json object + defer.returnValue(True) + except CodeMessageException as e: + if e.code == 404: + defer.returnValue(False) + return + logger.warning("query_user to %s received %s", uri, e.code) + except Exception as ex: + logger.warning("query_user to %s threw exception %s", uri, ex) + defer.returnValue(False) + + @defer.inlineCallbacks + def query_alias(self, service, alias): + uri = service.url + ("/rooms/%s" % urllib.quote(alias)) + response = None + try: + response = yield self.get_json(uri, { + "access_token": service.hs_token + }) + if response is not None: # just an empty json object + defer.returnValue(True) + except CodeMessageException as e: + logger.warning("query_alias to %s received %s", uri, e.code) + if e.code == 404: + defer.returnValue(False) + return + except Exception as ex: + logger.warning("query_alias to %s threw exception %s", uri, ex) + defer.returnValue(False) + + @defer.inlineCallbacks + def push_bulk(self, service, events): + events = self._serialize(events) + + uri = service.url + ("/transactions/%s" % + urllib.quote(str(0))) # TODO txn_ids + response = None + try: + response = yield self.put_json( + uri=uri, + json_body={ + "events": events + }, + args={ + "access_token": service.hs_token + }) + if response: # just an empty json object + # TODO: Mark txn as sent successfully + defer.returnValue(True) + except CodeMessageException as e: + logger.warning("push_bulk to %s received %s", uri, e.code) + except Exception as ex: + logger.warning("push_bulk to %s threw exception %s", uri, ex) + defer.returnValue(False) + + @defer.inlineCallbacks + def push(self, service, event): + response = yield self.push_bulk(service, [event]) + defer.returnValue(response) + + def _serialize(self, events): + time_now = self.clock.time_msec() + return [ + serialize_event(e, time_now, as_client_event=True) for e in events + ] |