diff options
Diffstat (limited to 'synapse')
-rwxr-xr-x | synapse/app/homeserver.py | 170 | ||||
-rw-r--r-- | synapse/federation/__init__.py | 9 | ||||
-rw-r--r-- | synapse/federation/replication.py | 2 | ||||
-rw-r--r-- | synapse/federation/transport/__init__.py | 52 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 4 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 82 | ||||
-rw-r--r-- | synapse/server.py | 135 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 45 | ||||
-rw-r--r-- | synapse/storage/_base.py | 49 | ||||
-rw-r--r-- | synapse/storage/events.py | 14 | ||||
-rw-r--r-- | synapse/storage/receipts.py | 8 | ||||
-rw-r--r-- | synapse/storage/stream.py | 13 | ||||
-rw-r--r-- | synapse/storage/tags.py | 7 | ||||
-rw-r--r-- | synapse/storage/util/id_generators.py | 36 |
14 files changed, 280 insertions, 346 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 6928d9d3e4..fb76be58a2 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -50,16 +50,14 @@ from twisted.cred import checkers, portal from twisted.internet import reactor, task, defer from twisted.application import service -from twisted.enterprise import adbapi from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.static import File from twisted.web.server import Site, GzipEncoderFactory, Request -from synapse.http.server import JsonResource, RootRedirect +from synapse.http.server import RootRedirect from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource from synapse.rest.key.v1.server_key_resource import LocalKey from synapse.rest.key.v2 import KeyApiV2Resource -from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.api.urls import ( FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX, SERVER_KEY_PREFIX, MEDIA_PREFIX, STATIC_PREFIX, @@ -69,6 +67,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory from synapse.util.logcontext import LoggingContext from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.federation.transport.server import TransportLayerServer from synapse import events @@ -95,80 +94,37 @@ def gz_wrap(r): return EncodingResourceWrapper(r, [GzipEncoderFactory()]) -class SynapseHomeServer(HomeServer): - - def build_http_client(self): - return MatrixFederationHttpClient(self) - - def build_client_resource(self): - return ClientRestResource(self) - - def build_resource_for_federation(self): - return JsonResource(self) - - def build_resource_for_web_client(self): - webclient_path = self.get_config().web_client_location - if not webclient_path: - try: - import syweb - except ImportError: - quit_with_error( - "Could not find a webclient.\n\n" - "Please either install the matrix-angular-sdk or configure\n" - "the location of the source to serve via the configuration\n" - "option `web_client_location`\n\n" - "To install the `matrix-angular-sdk` via pip, run:\n\n" - " pip install '%(dep)s'\n" - "\n" - "You can also disable hosting of the webclient via the\n" - "configuration option `web_client`\n" - % {"dep": DEPENDENCY_LINKS["matrix-angular-sdk"]} - ) - syweb_path = os.path.dirname(syweb.__file__) - webclient_path = os.path.join(syweb_path, "webclient") - # GZip is disabled here due to - # https://twistedmatrix.com/trac/ticket/7678 - # (It can stay enabled for the API resources: they call - # write() with the whole body and then finish() straight - # after and so do not trigger the bug. - # GzipFile was removed in commit 184ba09 - # return GzipFile(webclient_path) # TODO configurable? - return File(webclient_path) # TODO configurable? - - def build_resource_for_static_content(self): - # This is old and should go away: not going to bother adding gzip - return File( - os.path.join(os.path.dirname(synapse.__file__), "static") - ) - - def build_resource_for_content_repo(self): - return ContentRepoResource( - self, self.config.uploads_path, self.auth, self.content_addr - ) - - def build_resource_for_media_repository(self): - return MediaRepositoryResource(self) - - def build_resource_for_server_key(self): - return LocalKey(self) - - def build_resource_for_server_key_v2(self): - return KeyApiV2Resource(self) - - def build_resource_for_metrics(self): - if self.get_config().enable_metrics: - return MetricsResource(self) - else: - return None - - def build_db_pool(self): - name = self.db_config["name"] +def build_resource_for_web_client(hs): + webclient_path = hs.get_config().web_client_location + if not webclient_path: + try: + import syweb + except ImportError: + quit_with_error( + "Could not find a webclient.\n\n" + "Please either install the matrix-angular-sdk or configure\n" + "the location of the source to serve via the configuration\n" + "option `web_client_location`\n\n" + "To install the `matrix-angular-sdk` via pip, run:\n\n" + " pip install '%(dep)s'\n" + "\n" + "You can also disable hosting of the webclient via the\n" + "configuration option `web_client`\n" + % {"dep": DEPENDENCY_LINKS["matrix-angular-sdk"]} + ) + syweb_path = os.path.dirname(syweb.__file__) + webclient_path = os.path.join(syweb_path, "webclient") + # GZip is disabled here due to + # https://twistedmatrix.com/trac/ticket/7678 + # (It can stay enabled for the API resources: they call + # write() with the whole body and then finish() straight + # after and so do not trigger the bug. + # GzipFile was removed in commit 184ba09 + # return GzipFile(webclient_path) # TODO configurable? + return File(webclient_path) # TODO configurable? - return adbapi.ConnectionPool( - name, - **self.db_config.get("args", {}) - ) +class SynapseHomeServer(HomeServer): def _listener_http(self, config, listener_config): port = listener_config["port"] bind_address = listener_config.get("bind_address", "") @@ -178,13 +134,11 @@ class SynapseHomeServer(HomeServer): if tls and config.no_tls: return - metrics_resource = self.get_resource_for_metrics() - resources = {} for res in listener_config["resources"]: for name in res["names"]: if name == "client": - client_resource = self.get_client_resource() + client_resource = ClientRestResource(self) if res["compress"]: client_resource = gz_wrap(client_resource) @@ -198,31 +152,35 @@ class SynapseHomeServer(HomeServer): if name == "federation": resources.update({ - FEDERATION_PREFIX: self.get_resource_for_federation(), + FEDERATION_PREFIX: TransportLayerServer(self), }) if name in ["static", "client"]: resources.update({ - STATIC_PREFIX: self.get_resource_for_static_content(), + STATIC_PREFIX: File( + os.path.join(os.path.dirname(synapse.__file__), "static") + ), }) if name in ["media", "federation", "client"]: resources.update({ - MEDIA_PREFIX: self.get_resource_for_media_repository(), - CONTENT_REPO_PREFIX: self.get_resource_for_content_repo(), + MEDIA_PREFIX: MediaRepositoryResource(self), + CONTENT_REPO_PREFIX: ContentRepoResource( + self, self.config.uploads_path, self.auth, self.content_addr + ), }) if name in ["keys", "federation"]: resources.update({ - SERVER_KEY_PREFIX: self.get_resource_for_server_key(), - SERVER_KEY_V2_PREFIX: self.get_resource_for_server_key_v2(), + SERVER_KEY_PREFIX: LocalKey(self), + SERVER_KEY_V2_PREFIX: KeyApiV2Resource(self), }) if name == "webclient": - resources[WEB_CLIENT_PREFIX] = self.get_resource_for_web_client() + resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self) - if name == "metrics" and metrics_resource: - resources[METRICS_PREFIX] = metrics_resource + if name == "metrics" and self.get_config().enable_metrics: + resources[METRICS_PREFIX] = MetricsResource(self) root_resource = create_resource_tree(resources) if tls: @@ -296,6 +254,17 @@ class SynapseHomeServer(HomeServer): except IncorrectDatabaseSetup as e: quit_with_error(e.message) + def get_db_conn(self): + db_conn = self.database_engine.module.connect( + **{ + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + ) + + self.database_engine.on_new_connection(db_conn) + return db_conn + def quit_with_error(error_string): message_lines = error_string.split("\n") @@ -432,13 +401,7 @@ def setup(config_options): logger.info("Preparing database: %s...", config.database_config['name']) try: - db_conn = database_engine.module.connect( - **{ - k: v for k, v in config.database_config.get("args", {}).items() - if not k.startswith("cp_") - } - ) - + db_conn = hs.get_db_conn() database_engine.prepare_database(db_conn) hs.run_startup_checks(db_conn, database_engine) @@ -453,13 +416,17 @@ def setup(config_options): logger.info("Database prepared in %s.", config.database_config['name']) + hs.setup() hs.start_listening() - hs.get_pusherpool().start() - hs.get_state_handler().start_caching() - hs.get_datastore().start_profiling() - hs.get_datastore().start_doing_background_updates() - hs.get_replication_layer().start_get_pdu_cache() + def start(): + hs.get_pusherpool().start() + hs.get_state_handler().start_caching() + hs.get_datastore().start_profiling() + hs.get_datastore().start_doing_background_updates() + hs.get_replication_layer().start_get_pdu_cache() + + reactor.callWhenRunning(start) return hs @@ -675,7 +642,7 @@ def _resource_id(resource, path_seg): the mapping should looks like _resource_id(A,C) = B. Args: - resource (Resource): The *parent* Resource + resource (Resource): The *parent* Resourceb path_seg (str): The name of the child Resource to be attached. Returns: str: A unique string which can be a key to the child Resource. @@ -684,7 +651,7 @@ def _resource_id(resource, path_seg): def run(hs): - PROFILE_SYNAPSE = False + PROFILE_SYNAPSE = True if PROFILE_SYNAPSE: def profile(func): from cProfile import Profile @@ -761,6 +728,7 @@ def run(hs): auto_close_fds=False, verbose=True, logger=logger, + chdir=os.path.dirname(os.path.abspath(__file__)), ) daemon.start() diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py index 0bfb79d09f..979fdf2431 100644 --- a/synapse/federation/__init__.py +++ b/synapse/federation/__init__.py @@ -17,15 +17,10 @@ """ from .replication import ReplicationLayer -from .transport import TransportLayer +from .transport.client import TransportLayerClient def initialize_http_replication(homeserver): - transport = TransportLayer( - homeserver, - homeserver.hostname, - server=homeserver.get_resource_for_federation(), - client=homeserver.get_http_client() - ) + transport = TransportLayerClient(homeserver) return ReplicationLayer(homeserver, transport) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 6e0be8ef15..3e062a5eab 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -54,8 +54,6 @@ class ReplicationLayer(FederationClient, FederationServer): self.keyring = hs.get_keyring() self.transport_layer = transport_layer - self.transport_layer.register_received_handler(self) - self.transport_layer.register_request_handler(self) self.federation_client = self diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py index 155a7d5870..d9fcc520a0 100644 --- a/synapse/federation/transport/__init__.py +++ b/synapse/federation/transport/__init__.py @@ -20,55 +20,3 @@ By default this is done over HTTPS (and all home servers are required to support HTTPS), however individual pairings of servers may decide to communicate over a different (albeit still reliable) protocol. """ - -from .server import TransportLayerServer -from .client import TransportLayerClient - -from synapse.util.ratelimitutils import FederationRateLimiter - - -class TransportLayer(TransportLayerServer, TransportLayerClient): - """This is a basic implementation of the transport layer that translates - transactions and other requests to/from HTTP. - - Attributes: - server_name (str): Local home server host - - server (synapse.http.server.HttpServer): the http server to - register listeners on - - client (synapse.http.client.HttpClient): the http client used to - send requests - - request_handler (TransportRequestHandler): The handler to fire when we - receive requests for data. - - received_handler (TransportReceivedHandler): The handler to fire when - we receive data. - """ - - def __init__(self, homeserver, server_name, server, client): - """ - Args: - server_name (str): Local home server host - server (synapse.protocol.http.HttpServer): the http server to - register listeners on - client (synapse.protocol.http.HttpClient): the http client used to - send requests - """ - self.keyring = homeserver.get_keyring() - self.clock = homeserver.get_clock() - self.server_name = server_name - self.server = server - self.client = client - self.request_handler = None - self.received_handler = None - - self.ratelimiter = FederationRateLimiter( - self.clock, - window_size=homeserver.config.federation_rc_window_size, - sleep_limit=homeserver.config.federation_rc_sleep_limit, - sleep_msec=homeserver.config.federation_rc_sleep_delay, - reject_limit=homeserver.config.federation_rc_reject_limit, - concurrent_requests=homeserver.config.federation_rc_concurrent, - ) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 949d01dea8..2b5d40ea7f 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -28,6 +28,10 @@ logger = logging.getLogger(__name__) class TransportLayerClient(object): """Sends federation HTTP requests to other servers""" + def __init__(self, hs): + self.server_name = hs.hostname + self.client = hs.get_http_client() + @log_function def get_room_state(self, destination, room_id, event_id): """ Requests all state for a given room from the given server at the diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 8dca0a7f6b..65e054f7dd 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -17,7 +17,8 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError -from synapse.util.logutils import log_function +from synapse.http.server import JsonResource +from synapse.util.ratelimitutils import FederationRateLimiter import functools import logging @@ -28,9 +29,41 @@ import re logger = logging.getLogger(__name__) -class TransportLayerServer(object): +class TransportLayerServer(JsonResource): """Handles incoming federation HTTP requests""" + def __init__(self, hs): + self.hs = hs + self.clock = hs.get_clock() + + super(TransportLayerServer, self).__init__(hs) + + self.authenticator = Authenticator(hs) + self.ratelimiter = FederationRateLimiter( + self.clock, + window_size=hs.config.federation_rc_window_size, + sleep_limit=hs.config.federation_rc_sleep_limit, + sleep_msec=hs.config.federation_rc_sleep_delay, + reject_limit=hs.config.federation_rc_reject_limit, + concurrent_requests=hs.config.federation_rc_concurrent, + ) + + self.register_servlets() + + def register_servlets(self): + register_servlets( + self.hs, + resource=self, + ratelimiter=self.ratelimiter, + authenticator=self.authenticator, + ) + + +class Authenticator(object): + def __init__(self, hs): + self.keyring = hs.get_keyring() + self.server_name = hs.hostname + # A method just so we can pass 'self' as the authenticator to the Servlets @defer.inlineCallbacks def authenticate_request(self, request): @@ -98,37 +131,9 @@ class TransportLayerServer(object): defer.returnValue((origin, content)) - @log_function - def register_received_handler(self, handler): - """ Register a handler that will be fired when we receive data. - - Args: - handler (TransportReceivedHandler) - """ - FederationSendServlet( - handler, - authenticator=self, - ratelimiter=self.ratelimiter, - server_name=self.server_name, - ).register(self.server) - - @log_function - def register_request_handler(self, handler): - """ Register a handler that will be fired when we get asked for data. - - Args: - handler (TransportRequestHandler) - """ - for servletclass in SERVLET_CLASSES: - servletclass( - handler, - authenticator=self, - ratelimiter=self.ratelimiter, - ).register(self.server) - class BaseFederationServlet(object): - def __init__(self, handler, authenticator, ratelimiter): + def __init__(self, handler, authenticator, ratelimiter, server_name): self.handler = handler self.authenticator = authenticator self.ratelimiter = ratelimiter @@ -172,7 +177,9 @@ class FederationSendServlet(BaseFederationServlet): PATH = "/send/([^/]*)/" def __init__(self, handler, server_name, **kwargs): - super(FederationSendServlet, self).__init__(handler, **kwargs) + super(FederationSendServlet, self).__init__( + handler, server_name=server_name, **kwargs + ) self.server_name = server_name # This is when someone is trying to send us a bunch of data. @@ -432,6 +439,7 @@ class On3pidBindServlet(BaseFederationServlet): SERVLET_CLASSES = ( + FederationSendServlet, FederationPullServlet, FederationEventServlet, FederationStateServlet, @@ -451,3 +459,13 @@ SERVLET_CLASSES = ( FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, ) + + +def register_servlets(hs, resource, authenticator, ratelimiter): + for servletclass in SERVLET_CLASSES: + servletclass( + handler=hs.get_replication_layer(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) diff --git a/synapse/server.py b/synapse/server.py index 4a5796b982..e013a349c9 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -20,13 +20,15 @@ # Imports required for the default HomeServer() implementation from twisted.web.client import BrowserLikePolicyForHTTPS +from twisted.enterprise import adbapi + from synapse.federation import initialize_http_replication from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.notifier import Notifier from synapse.api.auth import Auth from synapse.handlers import Handlers from synapse.state import StateHandler -from synapse.storage import DataStore +from synapse.storage import get_datastore from synapse.util import Clock from synapse.util.distributor import Distributor from synapse.streams.events import EventSources @@ -36,8 +38,15 @@ from synapse.push.pusherpool import PusherPool from synapse.events.builder import EventBuilderFactory from synapse.api.filtering import Filtering +from synapse.http.matrixfederationclient import MatrixFederationHttpClient + +import logging + -class BaseHomeServer(object): +logger = logging.getLogger(__name__) + + +class HomeServer(object): """A basic homeserver object without lazy component builders. This will need all of the components it requires to either be passed as @@ -98,39 +107,18 @@ class BaseHomeServer(object): self.hostname = hostname self._building = {} + self.clock = Clock() + self.distributor = Distributor() + self.ratelimiter = Ratelimiter() + # Other kwargs are explicit dependencies for depname in kwargs: setattr(self, depname, kwargs[depname]) - @classmethod - def _make_dependency_method(cls, depname): - def _get(self): - if hasattr(self, depname): - return getattr(self, depname) - - if hasattr(self, "build_%s" % (depname)): - # Prevent cyclic dependencies from deadlocking - if depname in self._building: - raise ValueError("Cyclic dependency while building %s" % ( - depname, - )) - self._building[depname] = 1 - - builder = getattr(self, "build_%s" % (depname)) - dep = builder() - setattr(self, depname, dep) - - del self._building[depname] - - return dep - - raise NotImplementedError( - "%s has no %s nor a builder for it" % ( - type(self).__name__, depname, - ) - ) - - setattr(BaseHomeServer, "get_%s" % (depname), _get) + def setup(self): + logger.info("Setting up.") + self.datastore = get_datastore(self) + logger.info("Finished setting up.") def get_ip_from_request(self, request): # X-Forwarded-For is handled by our custom request type. @@ -142,33 +130,9 @@ class BaseHomeServer(object): def is_mine_id(self, string): return string.split(":", 1)[1] == self.hostname -# Build magic accessors for every dependency -for depname in BaseHomeServer.DEPENDENCIES: - BaseHomeServer._make_dependency_method(depname) - - -class HomeServer(BaseHomeServer): - """A homeserver object that will construct most of its dependencies as - required. - - It still requires the following to be specified by the caller: - resource_for_client - resource_for_web_client - resource_for_federation - resource_for_content_repo - http_client - db_pool - """ - - def build_clock(self): - return Clock() - def build_replication_layer(self): return initialize_http_replication(self) - def build_datastore(self): - return DataStore(self) - def build_handlers(self): return Handlers(self) @@ -179,10 +143,9 @@ class HomeServer(BaseHomeServer): return Auth(self) def build_http_client_context_factory(self): - config = self.get_config() return ( InsecureInterceptableContextFactory() - if config.use_insecure_ssl_client_just_for_testing_do_not_use + if self.config.use_insecure_ssl_client_just_for_testing_do_not_use else BrowserLikePolicyForHTTPS() ) @@ -201,15 +164,9 @@ class HomeServer(BaseHomeServer): def build_state_handler(self): return StateHandler(self) - def build_distributor(self): - return Distributor() - def build_event_sources(self): return EventSources(self) - def build_ratelimiter(self): - return Ratelimiter() - def build_keyring(self): return Keyring(self) @@ -224,3 +181,55 @@ class HomeServer(BaseHomeServer): def build_pusherpool(self): return PusherPool(self) + + def build_http_client(self): + return MatrixFederationHttpClient(self) + + def build_db_pool(self): + name = self.db_config["name"] + + return adbapi.ConnectionPool( + name, + **self.db_config.get("args", {}) + ) + + +def _make_dependency_method(depname): + def _get(hs): + try: + return getattr(hs, depname) + except AttributeError: + pass + + try: + builder = getattr(hs, "build_%s" % (depname)) + except AttributeError: + builder = None + + if builder: + # Prevent cyclic dependencies from deadlocking + if depname in hs._building: + raise ValueError("Cyclic dependency while building %s" % ( + depname, + )) + hs._building[depname] = 1 + + dep = builder() + setattr(hs, depname, dep) + + del hs._building[depname] + + return dep + + raise NotImplementedError( + "%s has no %s nor a builder for it" % ( + type(hs).__name__, depname, + ) + ) + + setattr(HomeServer, "get_%s" % (depname), _get) + + +# Build magic accessors for every dependency +for depname in HomeServer.DEPENDENCIES: + _make_dependency_method(depname) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7a3f6c4662..c8cab45f77 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -46,6 +46,9 @@ from .tags import TagsStore from .account_data import AccountDataStore +from util.id_generators import IdGenerator, StreamIdGenerator + + import logging @@ -58,6 +61,22 @@ logger = logging.getLogger(__name__) LAST_SEEN_GRANULARITY = 120*1000 +def get_datastore(hs): + logger.info("getting called!") + + conn = hs.get_db_conn() + try: + cur = conn.cursor() + cur.execute("SELECT MIN(stream_ordering) FROM events",) + rows = cur.fetchall() + min_token = rows[0][0] if rows and rows[0] and rows[0][0] else -1 + min_token = min(min_token, -1) + + return DataStore(conn, hs, min_token) + finally: + conn.close() + + class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, PresenceStore, TransactionStore, @@ -79,18 +98,36 @@ class DataStore(RoomMemberStore, RoomStore, EventPushActionsStore ): - def __init__(self, hs): - super(DataStore, self).__init__(hs) + def __init__(self, db_conn, hs, min_stream_token): self.hs = hs - self.min_token_deferred = self._get_min_token() - self.min_token = None + self.min_stream_token = min_stream_token self.client_ip_last_seen = Cache( name="client_ip_last_seen", keylen=4, ) + self._stream_id_gen = StreamIdGenerator( + db_conn, "events", "stream_ordering" + ) + self._receipts_id_gen = StreamIdGenerator( + db_conn, "receipts_linearized", "stream_id" + ) + self._account_data_id_gen = StreamIdGenerator( + db_conn, "account_data_max_stream_id", "stream_id" + ) + + self._transaction_id_gen = IdGenerator("sent_transactions", "id", self) + self._state_groups_id_gen = IdGenerator("state_groups", "id", self) + self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self) + self._refresh_tokens_id_gen = IdGenerator("refresh_tokens", "id", self) + self._pushers_id_gen = IdGenerator("pushers", "id", self) + self._push_rule_id_gen = IdGenerator("push_rules", "id", self) + self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self) + + super(DataStore, self).__init__(hs) + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent): now = int(self._clock.time_msec()) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 90d7aee94a..5e77320540 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,13 +15,11 @@ import logging from synapse.api.errors import StoreError -from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache import synapse.metrics -from util.id_generators import IdGenerator, StreamIdGenerator from twisted.internet import defer @@ -175,16 +173,6 @@ class SQLBaseStore(object): self.database_engine = hs.database_engine - self._stream_id_gen = StreamIdGenerator("events", "stream_ordering") - self._transaction_id_gen = IdGenerator("sent_transactions", "id", self) - self._state_groups_id_gen = IdGenerator("state_groups", "id", self) - self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self) - self._refresh_tokens_id_gen = IdGenerator("refresh_tokens", "id", self) - self._pushers_id_gen = IdGenerator("pushers", "id", self) - self._push_rule_id_gen = IdGenerator("push_rules", "id", self) - self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self) - self._receipts_id_gen = StreamIdGenerator("receipts_linearized", "stream_id") - def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -345,7 +333,8 @@ class SQLBaseStore(object): defer.returnValue(result) - def cursor_to_dict(self, cursor): + @staticmethod + def cursor_to_dict(cursor): """Converts a SQL cursor into an list of dicts. Args: @@ -402,8 +391,8 @@ class SQLBaseStore(object): if not or_ignore: raise - @log_function - def _simple_insert_txn(self, txn, table, values): + @staticmethod + def _simple_insert_txn(txn, table, values): keys, vals = zip(*values.items()) sql = "INSERT INTO %s (%s) VALUES(%s)" % ( @@ -414,7 +403,8 @@ class SQLBaseStore(object): txn.execute(sql, vals) - def _simple_insert_many_txn(self, txn, table, values): + @staticmethod + def _simple_insert_many_txn(txn, table, values): if not values: return @@ -537,9 +527,10 @@ class SQLBaseStore(object): table, keyvalues, retcol, allow_none=allow_none, ) - def _simple_select_one_onecol_txn(self, txn, table, keyvalues, retcol, + @classmethod + def _simple_select_one_onecol_txn(cls, txn, table, keyvalues, retcol, allow_none=False): - ret = self._simple_select_onecol_txn( + ret = cls._simple_select_onecol_txn( txn, table=table, keyvalues=keyvalues, @@ -554,7 +545,8 @@ class SQLBaseStore(object): else: raise StoreError(404, "No row found") - def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol): + @staticmethod + def _simple_select_onecol_txn(txn, table, keyvalues, retcol): sql = ( "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" ) % { @@ -603,7 +595,8 @@ class SQLBaseStore(object): table, keyvalues, retcols ) - def _simple_select_list_txn(self, txn, table, keyvalues, retcols): + @classmethod + def _simple_select_list_txn(cls, txn, table, keyvalues, retcols): """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. @@ -627,7 +620,7 @@ class SQLBaseStore(object): ) txn.execute(sql) - return self.cursor_to_dict(txn) + return cls.cursor_to_dict(txn) @defer.inlineCallbacks def _simple_select_many_batch(self, table, column, iterable, retcols, @@ -662,7 +655,8 @@ class SQLBaseStore(object): defer.returnValue(results) - def _simple_select_many_txn(self, txn, table, column, iterable, keyvalues, retcols): + @classmethod + def _simple_select_many_txn(cls, txn, table, column, iterable, keyvalues, retcols): """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. @@ -699,7 +693,7 @@ class SQLBaseStore(object): ) txn.execute(sql, values) - return self.cursor_to_dict(txn) + return cls.cursor_to_dict(txn) def _simple_update_one(self, table, keyvalues, updatevalues, desc="_simple_update_one"): @@ -726,7 +720,8 @@ class SQLBaseStore(object): table, keyvalues, updatevalues, ) - def _simple_update_one_txn(self, txn, table, keyvalues, updatevalues): + @staticmethod + def _simple_update_one_txn(txn, table, keyvalues, updatevalues): update_sql = "UPDATE %s SET %s WHERE %s" % ( table, ", ".join("%s = ?" % (k,) for k in updatevalues), @@ -743,7 +738,8 @@ class SQLBaseStore(object): if txn.rowcount > 1: raise StoreError(500, "More than one row matched") - def _simple_select_one_txn(self, txn, table, keyvalues, retcols, + @staticmethod + def _simple_select_one_txn(txn, table, keyvalues, retcols, allow_none=False): select_sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), @@ -784,7 +780,8 @@ class SQLBaseStore(object): raise StoreError(500, "more than one row matched") return self.runInteraction(desc, func) - def _simple_delete_txn(self, txn, table, keyvalues): + @staticmethod + def _simple_delete_txn(txn, table, keyvalues): sql = "DELETE FROM %s WHERE %s" % ( table, " AND ".join("%s = ?" % (k, ) for k in keyvalues) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ba368a3eca..298cb9bada 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -66,11 +66,9 @@ class EventsStore(SQLBaseStore): return if backfilled: - if not self.min_token_deferred.called: - yield self.min_token_deferred - start = self.min_token - 1 - self.min_token -= len(events_and_contexts) + 1 - stream_orderings = range(start, self.min_token, -1) + start = self.min_stream_token - 1 + self.min_stream_token -= len(events_and_contexts) + 1 + stream_orderings = range(start, self.min_stream_token, -1) @contextmanager def stream_ordering_manager(): @@ -107,10 +105,8 @@ class EventsStore(SQLBaseStore): is_new_state=True, current_state=None): stream_ordering = None if backfilled: - if not self.min_token_deferred.called: - yield self.min_token_deferred - self.min_token -= 1 - stream_ordering = self.min_token + self.min_stream_token -= 1 + stream_ordering = self.min_stream_token if stream_ordering is None: stream_ordering_manager = yield self._stream_id_gen.get_next(self) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index c4232bdc65..c0593e23ee 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -31,7 +31,9 @@ class ReceiptsStore(SQLBaseStore): def __init__(self, hs): super(ReceiptsStore, self).__init__(hs) - self._receipts_stream_cache = _RoomStreamChangeCache() + self._receipts_stream_cache = _RoomStreamChangeCache( + self._receipts_id_gen.get_max_token(None) + ) @cached(num_args=2) def get_receipts_for_room(self, room_id, receipt_type): @@ -377,11 +379,11 @@ class _RoomStreamChangeCache(object): may have changed since that key. If the key is too old then the cache will simply return all rooms. """ - def __init__(self, size_of_cache=10000): + def __init__(self, current_key, size_of_cache=10000): self._size_of_cache = size_of_cache self._room_to_key = {} self._cache = sorteddict() - self._earliest_key = None + self._earliest_key = current_key self.name = "ReceiptsRoomChangeCache" caches_by_name[self.name] = self._cache diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 02b1913e26..e31bad258a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -444,19 +444,6 @@ class StreamStore(SQLBaseStore): rows = txn.fetchall() return rows[0][0] if rows else 0 - @defer.inlineCallbacks - def _get_min_token(self): - row = yield self._execute( - "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events" - ) - - self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 - self.min_token = min(self.min_token, -1) - - logger.debug("min_token is: %s", self.min_token) - - defer.returnValue(self.min_token) - @staticmethod def _set_before_and_after(events, rows): for event, row in zip(events, rows): diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index ed9c91e5ea..4c39e07cbd 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -16,7 +16,6 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from twisted.internet import defer -from .util.id_generators import StreamIdGenerator import ujson as json import logging @@ -25,12 +24,6 @@ logger = logging.getLogger(__name__) class TagsStore(SQLBaseStore): - def __init__(self, hs): - super(TagsStore, self).__init__(hs) - - self._account_data_id_gen = StreamIdGenerator( - "account_data_max_stream_id", "stream_id" - ) def get_max_account_data_stream_id(self): """Get the current max stream id for the private user data stream diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index f58bf7fd2c..5c522f4ab9 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -72,28 +72,24 @@ class StreamIdGenerator(object): with stream_id_gen.get_next_txn(txn) as stream_id: # ... persist event ... """ - def __init__(self, table, column): + def __init__(self, db_conn, table, column): self.table = table self.column = column self._lock = threading.Lock() - self._current_max = None + cur = db_conn.cursor() + self._current_max = self._get_or_compute_current_max(cur) + cur.close() + self._unfinished_ids = deque() - @defer.inlineCallbacks def get_next(self, store): """ Usage: with yield stream_id_gen.get_next as stream_id: # ... persist event ... """ - if not self._current_max: - yield store.runInteraction( - "_compute_current_max", - self._get_or_compute_current_max, - ) - with self._lock: self._current_max += 1 next_id = self._current_max @@ -108,21 +104,14 @@ class StreamIdGenerator(object): with self._lock: self._unfinished_ids.remove(next_id) - defer.returnValue(manager()) + return manager() - @defer.inlineCallbacks def get_next_mult(self, store, n): """ Usage: with yield stream_id_gen.get_next(store, n) as stream_ids: # ... persist events ... """ - if not self._current_max: - yield store.runInteraction( - "_compute_current_max", - self._get_or_compute_current_max, - ) - with self._lock: next_ids = range(self._current_max + 1, self._current_max + n + 1) self._current_max += n @@ -139,24 +128,17 @@ class StreamIdGenerator(object): for next_id in next_ids: self._unfinished_ids.remove(next_id) - defer.returnValue(manager()) + return manager() - @defer.inlineCallbacks def get_max_token(self, store): """Returns the maximum stream id such that all stream ids less than or equal to it have been successfully persisted. """ - if not self._current_max: - yield store.runInteraction( - "_compute_current_max", - self._get_or_compute_current_max, - ) - with self._lock: if self._unfinished_ids: - defer.returnValue(self._unfinished_ids[0] - 1) + return self._unfinished_ids[0] - 1 - defer.returnValue(self._current_max) + return self._current_max def _get_or_compute_current_max(self, txn): with self._lock: |