diff options
author | Erik Johnston <erik@matrix.org> | 2016-11-03 15:44:50 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-11-03 15:44:50 +0000 |
commit | f594c8a2c36785c3fee5b113c97143745d1b44be (patch) | |
tree | c471022f8b1312f481c3f08bcd01a4a2edd374ac | |
parent | Add some comments (diff) | |
parent | Merge pull request #1192 from matrix-org/erikj/postgres_gist (diff) | |
download | synapse-erikj/new_profile.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/new_profile github/erikj/new_profile erikj/new_profile
-rw-r--r-- | CHANGES.rst | 23 | ||||
-rw-r--r-- | docs/metrics-howto.rst | 75 | ||||
-rw-r--r-- | synapse/__init__.py | 2 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 2 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 2 | ||||
-rw-r--r-- | synapse/http/server.py | 23 | ||||
-rw-r--r-- | synapse/metrics/__init__.py | 6 | ||||
-rw-r--r-- | synapse/metrics/process_collector.py | 10 | ||||
-rw-r--r-- | synapse/rest/media/v1/download_resource.py | 3 | ||||
-rw-r--r-- | synapse/rest/media/v1/thumbnail_resource.py | 3 | ||||
-rw-r--r-- | synapse/storage/schema/delta/38/postgres_fts_gist.sql | 17 | ||||
-rw-r--r-- | synapse/storage/search.py | 27 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 154 |
13 files changed, 146 insertions, 201 deletions
diff --git a/CHANGES.rst b/CHANGES.rst index 3abd4d2b6b..371f26eb6e 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,26 @@ +Changes in synapse v0.18.2 (2016-11-01) +======================================= + +No changes since v0.18.2-rc5 + + +Changes in synapse v0.18.2-rc5 (2016-10-28) +=========================================== + +Bug fixes: + +* Fix prometheus process metrics in worker processes (PR #1184) + + +Changes in synapse v0.18.2-rc4 (2016-10-27) +=========================================== + +Bug fixes: + +* Fix ``user_threepids`` schema delta, which in some instances prevented + startup after upgrade (PR #1183) + + Changes in synapse v0.18.2-rc3 (2016-10-27) =========================================== diff --git a/docs/metrics-howto.rst b/docs/metrics-howto.rst index c1f5ae2174..7aa4757a35 100644 --- a/docs/metrics-howto.rst +++ b/docs/metrics-howto.rst @@ -15,36 +15,45 @@ How to monitor Synapse metrics using Prometheus Restart synapse -3: Check out synapse-prometheus-config - https://github.com/matrix-org/synapse-prometheus-config - -4: Add ``synapse.html`` and ``synapse.rules`` - The ``.html`` file needs to appear in prometheus's ``consoles`` directory, - and the ``.rules`` file needs to be invoked somewhere in the main config - file. A symlink to each from the git checkout into the prometheus directory - might be easiest to ensure ``git pull`` keeps it updated. - -5: Add a prometheus target for synapse - This is easiest if prometheus runs on the same machine as synapse, as it can - then just use localhost:: - - global: { - rule_file: "synapse.rules" - } - - job: { - name: "synapse" - - target_group: { - target: "http://localhost:9092/" - } - } - -6: Start prometheus:: - - ./prometheus -config.file=prometheus.conf - -7: Wait a few seconds for it to start and perform the first scrape, - then visit the console: - - http://server-where-prometheus-runs:9090/consoles/synapse.html +3: Add a prometheus target for synapse. It needs to set the ``metrics_path`` + to a non-default value:: + + - job_name: "synapse" + metrics_path: "/_synapse/metrics" + static_configs: + - targets: + "my.server.here:9092" + +Standard Metric Names +--------------------- + +As of synapse version 0.18.2, the format of the process-wide metrics has been +changed to fit prometheus standard naming conventions. Additionally the units +have been changed to seconds, from miliseconds. + +================================== ============================= +New name Old name +---------------------------------- ----------------------------- +process_cpu_user_seconds_total process_resource_utime / 1000 +process_cpu_system_seconds_total process_resource_stime / 1000 +process_open_fds (no 'type' label) process_fds +================================== ============================= + +The python-specific counts of garbage collector performance have been renamed. + +=========================== ====================== +New name Old name +--------------------------- ---------------------- +python_gc_time reactor_gc_time +python_gc_unreachable_total reactor_gc_unreachable +python_gc_counts reactor_gc_counts +=========================== ====================== + +The twisted-specific reactor metrics have been renamed. + +==================================== ================= +New name Old name +------------------------------------ ----------------- +python_twisted_reactor_pending_calls reactor_tick_time +python_twisted_reactor_tick_time reactor_tick_time +==================================== ================= diff --git a/synapse/__init__.py b/synapse/__init__.py index 2c5dcf59f7..4e2a592d3d 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.18.2-rc3" +__version__ = "0.18.2" diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index f27150d411..54f35900f8 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -52,7 +52,6 @@ from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory from synapse.util.logcontext import LoggingContext from synapse.metrics import register_memory_metrics, get_metrics_for -from synapse.metrics.process_collector import register_process_collector from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX from synapse.federation.transport.server import TransportLayerServer @@ -338,7 +337,6 @@ def setup(config_options): hs.get_replication_layer().start_get_pdu_cache() register_memory_metrics(hs) - register_process_collector() reactor.callWhenRunning(start) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 3635521230..3851b35889 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -653,7 +653,7 @@ class AuthHandler(BaseHandler): Returns: Hashed password (str). """ - return bcrypt.hashpw(password + self.hs.config.password_pepper, + return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper, bcrypt.gensalt(self.bcrypt_rounds)) def validate_hash(self, password, stored_hash): diff --git a/synapse/http/server.py b/synapse/http/server.py index 168e53ce0c..14715878c5 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -392,17 +392,30 @@ def respond_with_json_bytes(request, code, json_bytes, send_cors=False, request.setHeader(b"Content-Length", b"%d" % (len(json_bytes),)) if send_cors: - request.setHeader("Access-Control-Allow-Origin", "*") - request.setHeader("Access-Control-Allow-Methods", - "GET, POST, PUT, DELETE, OPTIONS") - request.setHeader("Access-Control-Allow-Headers", - "Origin, X-Requested-With, Content-Type, Accept") + set_cors_headers(request) request.write(json_bytes) finish_request(request) return NOT_DONE_YET +def set_cors_headers(request): + """Set the CORs headers so that javascript running in a web browsers can + use this API + + Args: + request (twisted.web.http.Request): The http request to add CORs to. + """ + request.setHeader("Access-Control-Allow-Origin", "*") + request.setHeader( + "Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS" + ) + request.setHeader( + "Access-Control-Allow-Headers", + "Origin, X-Requested-With, Content-Type, Accept" + ) + + def finish_request(request): """ Finish writing the response to the request. diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a6b868775d..7041da25ce 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -24,6 +24,7 @@ from .metric import ( CounterMetric, CallbackMetric, DistributionMetric, CacheMetric, MemoryUsageMetric, ) +from .process_collector import register_process_collector logger = logging.getLogger(__name__) @@ -41,6 +42,9 @@ class Metrics(object): def __init__(self, name): self.name_prefix = name + def make_subspace(self, name): + return Metrics("%s_%s" % (self.name_prefix, name)) + def register_collector(self, func): all_collectors.append(func) @@ -118,6 +122,8 @@ reactor_metrics.register_callback( "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"] ) +register_process_collector(get_metrics_for("process")) + def runUntilCurrentTimer(func): diff --git a/synapse/metrics/process_collector.py b/synapse/metrics/process_collector.py index 1c851d9234..0e95582368 100644 --- a/synapse/metrics/process_collector.py +++ b/synapse/metrics/process_collector.py @@ -20,8 +20,6 @@ import os import stat from resource import getrusage, RUSAGE_SELF -from synapse.metrics import get_metrics_for - TICKS_PER_SEC = 100 BYTES_PER_PAGE = 4096 @@ -111,10 +109,10 @@ def _process_fds(): return counts -def register_process_collector(): +def register_process_collector(process_metrics): # Legacy synapse-invented metric names - resource_metrics = get_metrics_for("process.resource") + resource_metrics = process_metrics.make_subspace("resource") resource_metrics.register_collector(update_resource_metrics) @@ -125,12 +123,10 @@ def register_process_collector(): # kilobytes resource_metrics.register_callback("maxrss", lambda: rusage.ru_maxrss * 1024) - get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"]) + process_metrics.register_callback("fds", _process_fds, labels=["type"]) # New prometheus-standard metric names - process_metrics = get_metrics_for("process") - if HAVE_PROC_SELF_STAT: process_metrics.register_callback( "cpu_user_seconds_total", diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index a45ee9483e..dfb87ffd15 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -15,7 +15,7 @@ from ._base import parse_media_id, respond_with_file, respond_404 from twisted.web.resource import Resource -from synapse.http.server import request_handler +from synapse.http.server import request_handler, set_cors_headers from twisted.web.server import NOT_DONE_YET from twisted.internet import defer @@ -45,6 +45,7 @@ class DownloadResource(Resource): @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): + set_cors_headers(request) request.setHeader( "Content-Security-Policy", "default-src 'none';" diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index 0b9e1de1a7..d8f54adc99 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -17,7 +17,7 @@ from ._base import parse_media_id, respond_404, respond_with_file from twisted.web.resource import Resource from synapse.http.servlet import parse_string, parse_integer -from synapse.http.server import request_handler +from synapse.http.server import request_handler, set_cors_headers from twisted.web.server import NOT_DONE_YET from twisted.internet import defer @@ -48,6 +48,7 @@ class ThumbnailResource(Resource): @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): + set_cors_headers(request) server_name, media_id, _ = parse_media_id(request) width = parse_integer(request, "width") height = parse_integer(request, "height") diff --git a/synapse/storage/schema/delta/38/postgres_fts_gist.sql b/synapse/storage/schema/delta/38/postgres_fts_gist.sql new file mode 100644 index 0000000000..f090a7b75a --- /dev/null +++ b/synapse/storage/schema/delta/38/postgres_fts_gist.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + INSERT into background_updates (update_name, progress_json) + VALUES ('event_search_postgres_gist', '{}'); diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 12941d1775..8f2b3c4435 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -31,6 +31,7 @@ class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_UPDATE_NAME = "event_search" EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" + EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist" def __init__(self, hs): super(SearchStore, self).__init__(hs) @@ -41,6 +42,10 @@ class SearchStore(BackgroundUpdateStore): self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_reindex_search_order ) + self.register_background_update_handler( + self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME, + self._background_reindex_gist_search + ) @defer.inlineCallbacks def _background_reindex_search(self, progress, batch_size): @@ -140,6 +145,28 @@ class SearchStore(BackgroundUpdateStore): defer.returnValue(result) @defer.inlineCallbacks + def _background_reindex_gist_search(self, progress, batch_size): + def create_index(conn): + conn.rollback() + conn.set_session(autocommit=True) + c = conn.cursor() + + c.execute( + "CREATE INDEX CONCURRENTLY event_search_fts_idx_gist" + " ON event_search USING GIST (vector)" + ) + + c.execute("DROP INDEX event_search_fts_idx") + + conn.set_session(autocommit=False) + + if isinstance(self.database_engine, PostgresEngine): + yield self.runWithConnection(create_index) + + yield self._end_background_update(self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME) + defer.returnValue(1) + + @defer.inlineCallbacks def _background_reindex_search_order(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 5055c04b24..adab520c78 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -16,13 +16,12 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached -from twisted.internet import defer, reactor +from twisted.internet import defer from canonicaljson import encode_canonical_json from collections import namedtuple -import itertools import logging import ujson as json @@ -50,20 +49,6 @@ class TransactionStore(SQLBaseStore): def __init__(self, hs): super(TransactionStore, self).__init__(hs) - # New transactions that are currently in flights - self.inflight_transactions = {} - - # Newly delievered transactions that *weren't* persisted while in flight - self.new_delivered_transactions = {} - - # Newly delivered transactions that *were* persisted while in flight - self.update_delivered_transactions = {} - - self.last_transaction = {} - - reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) - self._clock.looping_call(self._persist_in_mem_txns, 1000) - self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) def get_received_txn_response(self, transaction_id, origin): @@ -148,46 +133,7 @@ class TransactionStore(SQLBaseStore): Returns: list: A list of previous transaction ids. """ - - auto_id = self._transaction_id_gen.get_next() - - txn_row = _TransactionRow( - id=auto_id, - transaction_id=transaction_id, - destination=destination, - ts=origin_server_ts, - response_code=0, - response_json=None, - ) - - self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - - prev_txn = self.last_transaction.get(destination) - if prev_txn: - return defer.succeed(prev_txn) - else: - return self.runInteraction( - "_get_prevs_txn", - self._get_prevs_txn, - destination, - ) - - def _get_prevs_txn(self, txn, destination): - # First we find out what the prev_txns should be. - # Since we know that we are only sending one transaction at a time, - # we can simply take the last one. - query = ( - "SELECT * FROM sent_transactions" - " WHERE destination = ?" - " ORDER BY id DESC LIMIT 1" - ) - - txn.execute(query, (destination,)) - results = self.cursor_to_dict(txn) - - prev_txns = [r["transaction_id"] for r in results] - - return prev_txns + return defer.succeed([]) def delivered_txn(self, transaction_id, destination, code, response_dict): """Persists the response for an outgoing transaction. @@ -198,52 +144,7 @@ class TransactionStore(SQLBaseStore): code (int) response_json (str) """ - - txn_row = self.inflight_transactions.get( - destination, {} - ).pop(transaction_id, None) - - self.last_transaction[destination] = transaction_id - - if txn_row: - d = self.new_delivered_transactions.setdefault(destination, {}) - d[transaction_id] = txn_row._replace( - response_code=code, - response_json=None, # For now, don't persist response - ) - else: - d = self.update_delivered_transactions.setdefault(destination, {}) - # For now, don't persist response - d[transaction_id] = _UpdateTransactionRow(code, None) - - def get_transactions_after(self, transaction_id, destination): - """Get all transactions after a given local transaction_id. - - Args: - transaction_id (str) - destination (str) - - Returns: - list: A list of dicts - """ - return self.runInteraction( - "get_transactions_after", - self._get_transactions_after, transaction_id, destination - ) - - def _get_transactions_after(self, txn, transaction_id, destination): - query = ( - "SELECT * FROM sent_transactions" - " WHERE destination = ? AND id >" - " (" - " SELECT id FROM sent_transactions" - " WHERE transaction_id = ? AND destination = ?" - " )" - ) - - txn.execute(query, (destination, transaction_id, destination)) - - return self.cursor_to_dict(txn) + pass @cached(max_entries=10000) def get_destination_retry_timings(self, destination): @@ -339,58 +240,11 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) - @defer.inlineCallbacks - def _persist_in_mem_txns(self): - try: - inflight = self.inflight_transactions - new_delivered = self.new_delivered_transactions - update_delivered = self.update_delivered_transactions - - self.inflight_transactions = {} - self.new_delivered_transactions = {} - self.update_delivered_transactions = {} - - full_rows = [ - row._asdict() - for txn_map in itertools.chain(inflight.values(), new_delivered.values()) - for row in txn_map.values() - ] - - def f(txn): - if full_rows: - self._simple_insert_many_txn( - txn=txn, - table="sent_transactions", - values=full_rows - ) - - for dest, txn_map in update_delivered.items(): - for txn_id, update_row in txn_map.items(): - self._simple_update_one_txn( - txn, - table="sent_transactions", - keyvalues={ - "transaction_id": txn_id, - "destination": dest, - }, - updatevalues={ - "response_code": update_row.response_code, - "response_json": None, # For now, don't persist response - } - ) - - if full_rows or update_delivered: - yield self.runInteraction("_persist_in_mem_txns", f) - except: - logger.exception("Failed to persist transactions!") - def _cleanup_transactions(self): now = self._clock.time_msec() month_ago = now - 30 * 24 * 60 * 60 * 1000 - six_hours_ago = now - 6 * 60 * 60 * 1000 def _cleanup_transactions_txn(txn): txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,)) - txn.execute("DELETE FROM sent_transactions WHERE ts < ?", (six_hours_ago,)) - return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn) + return self.runInteraction("_cleanup_transactions", _cleanup_transactions_txn) |