From 89de9349815b79f55bfff53b7c8c6d43f8c30336 Mon Sep 17 00:00:00 2001 From: Vincent Breitmoser Date: Sat, 6 Jan 2018 18:13:56 +0100 Subject: Use psycopg2cffi module instead of psycopg2 if running on pypy The psycopg2 package isn't available for PyPy. This commit adds a check if the runtime is PyPy, and if it is uses psycopg2cffi module in favor of psycopg2. This is almost a drop-in replacement, except for one place where an additional cast to string is required. --- synapse/storage/_base.py | 2 +- synapse/storage/engines/__init__.py | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2fbebd4907..2262776ab2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -376,7 +376,7 @@ class SQLBaseStore(object): Returns: A list of dicts where the key is the column header. """ - col_headers = list(intern(column[0]) for column in cursor.description) + col_headers = list(intern(str(column[0])) for column in cursor.description) results = list( dict(zip(col_headers, row)) for row in cursor ) diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index 338b495611..be7f6d6ac3 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -18,6 +18,7 @@ from .postgres import PostgresEngine from .sqlite3 import Sqlite3Engine import importlib +import platform SUPPORTED_MODULE = { @@ -31,7 +32,12 @@ def create_engine(database_config): engine_class = SUPPORTED_MODULE.get(name, None) if engine_class: - module = importlib.import_module(name) + needs_pypy_hack = (name == "psycopg2" and + platform.python_implementation() == "PyPy") + if needs_pypy_hack: + module = importlib.import_module("psycopg2cffi") + else: + module = importlib.import_module(name) return engine_class(module, database_config) raise RuntimeError( -- cgit 1.5.1 From d1e56cfcd11bcd509d8fa3954c00e06a84bddd87 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 10 Apr 2018 00:21:51 +0100 Subject: Fix pep8 error on psycopg2cffi hack --- synapse/storage/engines/__init__.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index be7f6d6ac3..8c868ece75 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -32,12 +32,11 @@ def create_engine(database_config): engine_class = SUPPORTED_MODULE.get(name, None) if engine_class: - needs_pypy_hack = (name == "psycopg2" and - platform.python_implementation() == "PyPy") - if needs_pypy_hack: - module = importlib.import_module("psycopg2cffi") - else: - module = importlib.import_module(name) + # pypy requires psycopg2cffi rather than psycopg2 + if (name == "psycopg2" and + platform.python_implementation() == "PyPy"): + name = "psycopg2cffi" + module = importlib.import_module(name) return engine_class(module, database_config) raise RuntimeError( -- cgit 1.5.1 From d54cfbb7a826ccdff8b911b214bb7476a04a65a6 Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Tue, 10 Apr 2018 17:38:16 +0100 Subject: fix typo --- synapse/storage/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index eacd49d6a5..8cdfd50f90 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -266,9 +266,9 @@ class DataStore(RoomMemberStore, RoomStore, def count_r30_users(self): """ Counts the number of 30 day retained users, defined as:- - * Users who have created their accounts more than 30 days + * Users who have created their accounts more than 30 days ago * Where last seen at most 30 days ago - * Where account creation and last_seen are > 30 days + * Where account creation and last_seen are > 30 days apart Returns counts globaly for a given user as well as breaking by platform -- cgit 1.5.1 From 92e34615c5c9ed5df2b0072a2686d3e42239e420 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Apr 2018 11:07:51 +0100 Subject: Track where event stream processing have gotten up to --- synapse/federation/transaction_queue.py | 4 ++++ synapse/handlers/appservice.py | 4 ++++ synapse/metrics/__init__.py | 13 +++++++++++++ synapse/storage/events.py | 3 +++ 4 files changed, 24 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5b0b798e57..d68f0d8950 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -239,6 +239,10 @@ class TransactionQueue(object): "events", next_token ) + synapse.metrics.event_processing_positions.set( + next_token, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 3dd3fa2a27..2a7e31e711 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -111,6 +111,10 @@ class ApplicationServicesHandler(object): events_processed_counter.inc_by(len(events)) yield self.store.set_appservice_last_pos(upper_bound) + + synapse.metrics.event_processing_positions.set( + upper_bound, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 2666f982a6..a6c0d7e1bf 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -151,6 +151,19 @@ reactor_metrics = get_metrics_for("python.twisted.reactor") tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +synapse_metrics = get_metrics_for("synapse") + +# Used to track where various components have processed in the event stream, +# e.g. federation sending, appservice sending, etc. +event_processing_positions = synapse_metrics.register_gauge( + "event_processing_positions", labels=["name"], +) + +# Used to track the current max events stream position +event_persisted_position = synapse_metrics.register_gauge( + "event_persisted_position", +) + def runUntilCurrentTimer(func): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ece5e6c41f..da44b52fd6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -444,6 +444,9 @@ class EventsStore(EventsWorkerStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + synapse.metrics.event_persisted_position.set( + chunk[-1][0].internal_metadata.stream_ordering, + ) for event, context in chunk: if context.app_service: origin_type = "local" -- cgit 1.5.1 From 4dae4a97ed0e0b2cc9b5493172670ec7353ded2e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Apr 2018 11:52:19 +0100 Subject: Track last processed event received_ts --- synapse/federation/transaction_queue.py | 11 +++++++++++ synapse/handlers/appservice.py | 10 ++++++++++ synapse/metrics/__init__.py | 13 +++++++++++++ synapse/storage/events_worker.py | 18 ++++++++++++++++++ 4 files changed, 52 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index d68f0d8950..1c466cbded 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -243,6 +243,17 @@ class TransactionQueue(object): next_token, "federation_sender", ) + if events: + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "federation_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 2a7e31e711..fcc1246bee 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -115,6 +115,16 @@ class ApplicationServicesHandler(object): synapse.metrics.event_processing_positions.set( upper_bound, "appservice_sender", ) + + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "appservice_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a6c0d7e1bf..e3b831db67 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -164,6 +164,19 @@ event_persisted_position = synapse_metrics.register_gauge( "event_persisted_position", ) +# Used to track the received_ts of the last event processed by various +# components +event_processing_last_ts = synapse_metrics.register_gauge( + "event_processing_last_ts", labels=["name"], +) + +# Used to track the lag processing events. This is the time difference +# between the last processed event's received_ts and the time it was +# finished being processed. +event_processing_lag = synapse_metrics.register_gauge( + "event_processing_lag", labels=["name"], +) + def runUntilCurrentTimer(func): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 2e23dd78ba..769eb51489 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -51,6 +51,24 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): + def get_received_ts(self, event_id): + """Get received_ts (when it was persisted) for the event + + Args: + event_id (str) + + Returns: + Deferred[int|None]: Timstamp in milliseconds, or None for events + that were persisted before received_ts was implemented. + """ + return self._simple_select_one_onecol( + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="received_ts", + desc="get_received_ts", + ) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, -- cgit 1.5.1 From 23a7f9d7f4cfbf0333a1def526a59fb4b0f01067 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Apr 2018 11:20:51 +0100 Subject: Doc we raise on unknown event --- synapse/storage/events_worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 769eb51489..88360b48b8 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -52,13 +52,14 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): def get_received_ts(self, event_id): - """Get received_ts (when it was persisted) for the event + """Get received_ts (when it was persisted) for the event. Raises an + exception for unknown events. Args: event_id (str) Returns: - Deferred[int|None]: Timstamp in milliseconds, or None for events + Deferred[int|None]: Timestamp in milliseconds, or None for events that were persisted before received_ts was implemented. """ return self._simple_select_one_onecol( -- cgit 1.5.1 From 415aeefd89ca48d3a3e9a7a981999ae071f6060b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Apr 2018 12:07:09 +0100 Subject: Format docstring --- synapse/storage/events_worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 88360b48b8..a937b9bceb 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -52,8 +52,9 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): def get_received_ts(self, event_id): - """Get received_ts (when it was persisted) for the event. Raises an - exception for unknown events. + """Get received_ts (when it was persisted) for the event. + + Raises an exception for unknown events. Args: event_id (str) -- cgit 1.5.1