From 89de9349815b79f55bfff53b7c8c6d43f8c30336 Mon Sep 17 00:00:00 2001 From: Vincent Breitmoser Date: Sat, 6 Jan 2018 18:13:56 +0100 Subject: Use psycopg2cffi module instead of psycopg2 if running on pypy The psycopg2 package isn't available for PyPy. This commit adds a check if the runtime is PyPy, and if it is uses psycopg2cffi module in favor of psycopg2. This is almost a drop-in replacement, except for one place where an additional cast to string is required. --- synapse/storage/_base.py | 2 +- synapse/storage/engines/__init__.py | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2fbebd4907..2262776ab2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -376,7 +376,7 @@ class SQLBaseStore(object): Returns: A list of dicts where the key is the column header. """ - col_headers = list(intern(column[0]) for column in cursor.description) + col_headers = list(intern(str(column[0])) for column in cursor.description) results = list( dict(zip(col_headers, row)) for row in cursor ) diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index 338b495611..be7f6d6ac3 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -18,6 +18,7 @@ from .postgres import PostgresEngine from .sqlite3 import Sqlite3Engine import importlib +import platform SUPPORTED_MODULE = { @@ -31,7 +32,12 @@ def create_engine(database_config): engine_class = SUPPORTED_MODULE.get(name, None) if engine_class: - module = importlib.import_module(name) + needs_pypy_hack = (name == "psycopg2" and + platform.python_implementation() == "PyPy") + if needs_pypy_hack: + module = importlib.import_module("psycopg2cffi") + else: + module = importlib.import_module(name) return engine_class(module, database_config) raise RuntimeError( -- cgit 1.5.1 From d1e56cfcd11bcd509d8fa3954c00e06a84bddd87 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 10 Apr 2018 00:21:51 +0100 Subject: Fix pep8 error on psycopg2cffi hack --- synapse/storage/engines/__init__.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index be7f6d6ac3..8c868ece75 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -32,12 +32,11 @@ def create_engine(database_config): engine_class = SUPPORTED_MODULE.get(name, None) if engine_class: - needs_pypy_hack = (name == "psycopg2" and - platform.python_implementation() == "PyPy") - if needs_pypy_hack: - module = importlib.import_module("psycopg2cffi") - else: - module = importlib.import_module(name) + # pypy requires psycopg2cffi rather than psycopg2 + if (name == "psycopg2" and + platform.python_implementation() == "PyPy"): + name = "psycopg2cffi" + module = importlib.import_module(name) return engine_class(module, database_config) raise RuntimeError( -- cgit 1.5.1 From d54cfbb7a826ccdff8b911b214bb7476a04a65a6 Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Tue, 10 Apr 2018 17:38:16 +0100 Subject: fix typo --- synapse/storage/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index eacd49d6a5..8cdfd50f90 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -266,9 +266,9 @@ class DataStore(RoomMemberStore, RoomStore, def count_r30_users(self): """ Counts the number of 30 day retained users, defined as:- - * Users who have created their accounts more than 30 days + * Users who have created their accounts more than 30 days ago * Where last seen at most 30 days ago - * Where account creation and last_seen are > 30 days + * Where account creation and last_seen are > 30 days apart Returns counts globaly for a given user as well as breaking by platform -- cgit 1.5.1 From 92e34615c5c9ed5df2b0072a2686d3e42239e420 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Apr 2018 11:07:51 +0100 Subject: Track where event stream processing have gotten up to --- synapse/federation/transaction_queue.py | 4 ++++ synapse/handlers/appservice.py | 4 ++++ synapse/metrics/__init__.py | 13 +++++++++++++ synapse/storage/events.py | 3 +++ 4 files changed, 24 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5b0b798e57..d68f0d8950 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -239,6 +239,10 @@ class TransactionQueue(object): "events", next_token ) + synapse.metrics.event_processing_positions.set( + next_token, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 3dd3fa2a27..2a7e31e711 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -111,6 +111,10 @@ class ApplicationServicesHandler(object): events_processed_counter.inc_by(len(events)) yield self.store.set_appservice_last_pos(upper_bound) + + synapse.metrics.event_processing_positions.set( + upper_bound, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 2666f982a6..a6c0d7e1bf 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -151,6 +151,19 @@ reactor_metrics = get_metrics_for("python.twisted.reactor") tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +synapse_metrics = get_metrics_for("synapse") + +# Used to track where various components have processed in the event stream, +# e.g. federation sending, appservice sending, etc. +event_processing_positions = synapse_metrics.register_gauge( + "event_processing_positions", labels=["name"], +) + +# Used to track the current max events stream position +event_persisted_position = synapse_metrics.register_gauge( + "event_persisted_position", +) + def runUntilCurrentTimer(func): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ece5e6c41f..da44b52fd6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -444,6 +444,9 @@ class EventsStore(EventsWorkerStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + synapse.metrics.event_persisted_position.set( + chunk[-1][0].internal_metadata.stream_ordering, + ) for event, context in chunk: if context.app_service: origin_type = "local" -- cgit 1.5.1 From 4dae4a97ed0e0b2cc9b5493172670ec7353ded2e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Apr 2018 11:52:19 +0100 Subject: Track last processed event received_ts --- synapse/federation/transaction_queue.py | 11 +++++++++++ synapse/handlers/appservice.py | 10 ++++++++++ synapse/metrics/__init__.py | 13 +++++++++++++ synapse/storage/events_worker.py | 18 ++++++++++++++++++ 4 files changed, 52 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index d68f0d8950..1c466cbded 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -243,6 +243,17 @@ class TransactionQueue(object): next_token, "federation_sender", ) + if events: + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "federation_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 2a7e31e711..fcc1246bee 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -115,6 +115,16 @@ class ApplicationServicesHandler(object): synapse.metrics.event_processing_positions.set( upper_bound, "appservice_sender", ) + + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "appservice_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a6c0d7e1bf..e3b831db67 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -164,6 +164,19 @@ event_persisted_position = synapse_metrics.register_gauge( "event_persisted_position", ) +# Used to track the received_ts of the last event processed by various +# components +event_processing_last_ts = synapse_metrics.register_gauge( + "event_processing_last_ts", labels=["name"], +) + +# Used to track the lag processing events. This is the time difference +# between the last processed event's received_ts and the time it was +# finished being processed. +event_processing_lag = synapse_metrics.register_gauge( + "event_processing_lag", labels=["name"], +) + def runUntilCurrentTimer(func): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 2e23dd78ba..769eb51489 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -51,6 +51,24 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): + def get_received_ts(self, event_id): + """Get received_ts (when it was persisted) for the event + + Args: + event_id (str) + + Returns: + Deferred[int|None]: Timstamp in milliseconds, or None for events + that were persisted before received_ts was implemented. + """ + return self._simple_select_one_onecol( + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="received_ts", + desc="get_received_ts", + ) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, -- cgit 1.5.1 From 23a7f9d7f4cfbf0333a1def526a59fb4b0f01067 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Apr 2018 11:20:51 +0100 Subject: Doc we raise on unknown event --- synapse/storage/events_worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 769eb51489..88360b48b8 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -52,13 +52,14 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): def get_received_ts(self, event_id): - """Get received_ts (when it was persisted) for the event + """Get received_ts (when it was persisted) for the event. Raises an + exception for unknown events. Args: event_id (str) Returns: - Deferred[int|None]: Timstamp in milliseconds, or None for events + Deferred[int|None]: Timestamp in milliseconds, or None for events that were persisted before received_ts was implemented. """ return self._simple_select_one_onecol( -- cgit 1.5.1 From 415aeefd89ca48d3a3e9a7a981999ae071f6060b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Apr 2018 12:07:09 +0100 Subject: Format docstring --- synapse/storage/events_worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 88360b48b8..a937b9bceb 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -52,8 +52,9 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): def get_received_ts(self, event_id): - """Get received_ts (when it was persisted) for the event. Raises an - exception for unknown events. + """Get received_ts (when it was persisted) for the event. + + Raises an exception for unknown events. Args: event_id (str) -- cgit 1.5.1 From 878995e660e46c5dedb56cde8bd7a3d46838cdd7 Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sun, 15 Apr 2018 21:37:58 +0200 Subject: Replace Queue with six.moves.queue and a six.range change which I missed the last time Signed-off-by: Adrian Tschira --- synapse/storage/event_federation.py | 6 ++++-- synapse/util/file_consumer.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 00ee82d300..e828328243 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -24,7 +24,9 @@ from synapse.util.caches.descriptors import cached from unpaddedbase64 import encode_base64 import logging -from Queue import PriorityQueue, Empty +from six.moves.queue import PriorityQueue, Empty + +from six.moves import range logger = logging.getLogger(__name__) @@ -78,7 +80,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, front_list = list(front) chunks = [ front_list[x:x + 100] - for x in xrange(0, len(front), 100) + for x in range(0, len(front), 100) ] for chunk in chunks: txn.execute( diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 90a2608d6f..3c8a165331 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -17,7 +17,7 @@ from twisted.internet import threads, reactor from synapse.util.logcontext import make_deferred_yieldable, preserve_fn -import Queue +from six.moves import queue class BackgroundFileConsumer(object): @@ -49,7 +49,7 @@ class BackgroundFileConsumer(object): # Queue of slices of bytes to be written. When producer calls # unregister a final None is sent. - self._bytes_queue = Queue.Queue() + self._bytes_queue = queue.Queue() # Deferred that is resolved when finished writing self._finished_deferred = None -- cgit 1.5.1 From 639480e14a06723adf6817ddd2b2ff9e4f4cdf2a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 16 Apr 2018 18:41:37 +0100 Subject: Avoid creating events with huge numbers of prev_events In most cases, we limit the number of prev_events for a given event to 10 events. This fixes a particular code path which created events with huge numbers of prev_events. --- synapse/handlers/message.py | 78 +++++++++++++++++++--------------- synapse/handlers/room_member.py | 13 ++++-- synapse/storage/event_federation.py | 57 ++++++++++++++++++------- tests/storage/test_event_federation.py | 68 +++++++++++++++++++++++++++++ 4 files changed, 162 insertions(+), 54 deletions(-) create mode 100644 tests/storage/test_event_federation.py (limited to 'synapse/storage') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 54cd691f91..21628a8540 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -37,7 +37,6 @@ from ._base import BaseHandler from canonicaljson import encode_canonical_json import logging -import random import simplejson logger = logging.getLogger(__name__) @@ -433,7 +432,7 @@ class EventCreationHandler(object): @defer.inlineCallbacks def create_event(self, requester, event_dict, token_id=None, txn_id=None, - prev_event_ids=None): + prev_events_and_hashes=None): """ Given a dict from a client, create a new event. @@ -447,7 +446,13 @@ class EventCreationHandler(object): event_dict (dict): An entire event token_id (str) txn_id (str) - prev_event_ids (list): The prev event ids to use when creating the event + + prev_events_and_hashes (list[(str, dict[str, str], int)]|None): + the forward extremities to use as the prev_events for the + new event. For each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + + If None, they will be requested from the database. Returns: Tuple of created event (FrozenEvent), Context @@ -485,7 +490,7 @@ class EventCreationHandler(object): event, context = yield self.create_new_client_event( builder=builder, requester=requester, - prev_event_ids=prev_event_ids, + prev_events_and_hashes=prev_events_and_hashes, ) defer.returnValue((event, context)) @@ -588,39 +593,44 @@ class EventCreationHandler(object): @measure_func("create_new_client_event") @defer.inlineCallbacks - def create_new_client_event(self, builder, requester=None, prev_event_ids=None): - if prev_event_ids: - prev_events = yield self.store.add_event_hashes(prev_event_ids) - prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) - depth = prev_max_depth + 1 - else: - latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( - builder.room_id, + def create_new_client_event(self, builder, requester=None, + prev_events_and_hashes=None): + """Create a new event for a local client + + Args: + builder (EventBuilder): + + requester (synapse.types.Requester|None): + + prev_events_and_hashes (list[(str, dict[str, str], int)]|None): + the forward extremities to use as the prev_events for the + new event. For each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + + If None, they will be requested from the database. + + Returns: + Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)] + """ + + if prev_events_and_hashes is not None: + assert len(prev_events_and_hashes) <= 10, \ + "Attempting to create an event with %i prev_events" % ( + len(prev_events_and_hashes), ) + else: + prev_events_and_hashes = \ + yield self.store.get_prev_events_for_room(builder.room_id) - # We want to limit the max number of prev events we point to in our - # new event - if len(latest_ret) > 10: - # Sort by reverse depth, so we point to the most recent. - latest_ret.sort(key=lambda a: -a[2]) - new_latest_ret = latest_ret[:5] - - # We also randomly point to some of the older events, to make - # sure that we don't completely ignore the older events. - if latest_ret[5:]: - sample_size = min(5, len(latest_ret[5:])) - new_latest_ret.extend(random.sample(latest_ret[5:], sample_size)) - latest_ret = new_latest_ret - - if latest_ret: - depth = max([d for _, _, d in latest_ret]) + 1 - else: - depth = 1 + if prev_events_and_hashes: + depth = max([d for _, _, d in prev_events_and_hashes]) + 1 + else: + depth = 1 - prev_events = [ - (event_id, prev_hashes) - for event_id, prev_hashes, _ in latest_ret - ] + prev_events = [ + (event_id, prev_hashes) + for event_id, prev_hashes, _ in prev_events_and_hashes + ] builder.prev_events = prev_events builder.depth = depth diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index c45142d38d..714583f1d5 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -149,7 +149,7 @@ class RoomMemberHandler(object): @defer.inlineCallbacks def _local_membership_update( self, requester, target, room_id, membership, - prev_event_ids, + prev_events_and_hashes, txn_id=None, ratelimit=True, content=None, @@ -175,7 +175,7 @@ class RoomMemberHandler(object): }, token_id=requester.access_token_id, txn_id=txn_id, - prev_event_ids=prev_event_ids, + prev_events_and_hashes=prev_events_and_hashes, ) # Check if this event matches the previous membership event for the user. @@ -314,7 +314,12 @@ class RoomMemberHandler(object): 403, "Invites have been disabled on this server", ) - latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + prev_events_and_hashes = yield self.store.get_prev_events_for_room( + room_id, + ) + latest_event_ids = ( + event_id for (event_id, _, _) in prev_events_and_hashes + ) current_state_ids = yield self.state_handler.get_current_state_ids( room_id, latest_event_ids=latest_event_ids, ) @@ -403,7 +408,7 @@ class RoomMemberHandler(object): membership=effective_membership_state, txn_id=txn_id, ratelimit=ratelimit, - prev_event_ids=latest_event_ids, + prev_events_and_hashes=prev_events_and_hashes, content=content, ) defer.returnValue(res) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 00ee82d300..a183fc6b50 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import random from twisted.internet import defer @@ -133,7 +134,47 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, retcol="event_id", ) + @defer.inlineCallbacks + def get_prev_events_for_room(self, room_id): + """ + Gets a subset of the current forward extremities in the given room. + + Limits the result to 10 extremities, so that we can avoid creating + events which refer to hundreds of prev_events. + + Args: + room_id (str): room_id + + Returns: + Deferred[list[(str, dict[str, str], int)]] + for each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + """ + res = yield self.get_latest_event_ids_and_hashes_in_room(room_id) + if len(res) > 10: + # Sort by reverse depth, so we point to the most recent. + res.sort(key=lambda a: -a[2]) + + # we use half of the limit for the actual most recent events, and + # the other half to randomly point to some of the older events, to + # make sure that we don't completely ignore the older events. + res = res[0:5] + random.sample(res[5:], 5) + + defer.returnValue(res) + def get_latest_event_ids_and_hashes_in_room(self, room_id): + """ + Gets the current forward extremities in the given room + + Args: + room_id (str): room_id + + Returns: + Deferred[list[(str, dict[str, str], int)]] + for each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + """ + return self.runInteraction( "get_latest_event_ids_and_hashes_in_room", self._get_latest_event_ids_and_hashes_in_room, @@ -182,22 +223,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, room_id, ) - @defer.inlineCallbacks - def get_max_depth_of_events(self, event_ids): - sql = ( - "SELECT MAX(depth) FROM events WHERE event_id IN (%s)" - ) % (",".join(["?"] * len(event_ids)),) - - rows = yield self._execute( - "get_max_depth_of_events", None, - sql, *event_ids - ) - - if rows: - defer.returnValue(rows[0][0]) - else: - defer.returnValue(1) - def _get_min_depth_interaction(self, txn, room_id): min_depth = self._simple_select_one_onecol_txn( txn, diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py new file mode 100644 index 0000000000..30683e7888 --- /dev/null +++ b/tests/storage/test_event_federation.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import tests.unittest +import tests.utils + + +class EventFederationWorkerStoreTestCase(tests.unittest.TestCase): + @defer.inlineCallbacks + def setUp(self): + hs = yield tests.utils.setup_test_homeserver() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def test_get_prev_events_for_room(self): + room_id = '@ROOM:local' + + # add a bunch of events and hashes to act as forward extremities + def insert_event(txn, i): + event_id = '$event_%i:local' % i + + txn.execute(( + "INSERT INTO events (" + " room_id, event_id, type, depth, topological_ordering," + " content, processed, outlier) " + "VALUES (?, ?, 'm.test', ?, ?, 'test', ?, ?)" + ), (room_id, event_id, i, i, True, False)) + + txn.execute(( + 'INSERT INTO event_forward_extremities (room_id, event_id) ' + 'VALUES (?, ?)' + ), (room_id, event_id)) + + txn.execute(( + 'INSERT INTO event_reference_hashes ' + '(event_id, algorithm, hash) ' + "VALUES (?, 'sha256', ?)" + ), (event_id, 'ffff')) + + for i in range(0, 11): + yield self.store.runInteraction("insert", insert_event, i) + + # this should get the last five and five others + r = yield self.store.get_prev_events_for_room(room_id) + self.assertEqual(10, len(r)) + for i in range(0, 5): + el = r[i] + depth = el[2] + self.assertEqual(10 - i, depth) + + for i in range(5, 5): + el = r[i] + depth = el[2] + self.assertLessEqual(5, depth) -- cgit 1.5.1 From b1dfbc3c4015865ab9798e5911b614e82a3dca2c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 17 Apr 2018 18:30:53 +0100 Subject: Refactor store.have_events It turns out that most of the time we were calling have_events, we were only using half of the result. Replace have_events with have_seen_events and get_rejection_reasons, so that we can see what's going on a bit more clearly. --- synapse/federation/federation_client.py | 2 +- synapse/handlers/federation.py | 31 ++++++++------------- synapse/storage/events.py | 49 ++++++++++++++++++++++++++++----- 3 files changed, 55 insertions(+), 27 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 38440da5b5..8e2c0c4cd2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -394,7 +394,7 @@ class FederationClient(FederationBase): seen_events = yield self.store.get_events(event_ids, allow_rejected=True) signed_events = seen_events.values() else: - seen_events = yield self.store.have_events(event_ids) + seen_events = yield self.store.have_seen_events(event_ids) signed_events = [] failed_to_fetch = set() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 080aca3d71..ea6cb879fc 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -149,10 +149,6 @@ class FederationHandler(BaseHandler): auth_chain = [] - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - fetch_state = False # Get missing pdus if necessary. @@ -168,7 +164,7 @@ class FederationHandler(BaseHandler): ) prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) + seen = yield self.store.have_seen_events(prevs) if min_depth and pdu.depth < min_depth: # This is so that we don't notify the user about this @@ -196,8 +192,7 @@ class FederationHandler(BaseHandler): # Update the set of things we've seen after trying to # fetch the missing stuff - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.iterkeys()) + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: logger.info( @@ -248,8 +243,7 @@ class FederationHandler(BaseHandler): min_depth (int): Minimum depth of events to return. """ # We recalculate seen, since it may have changed. - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.keys()) + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: return @@ -361,9 +355,7 @@ class FederationHandler(BaseHandler): if auth_chain: event_ids |= {e.event_id for e in auth_chain} - seen_ids = set( - (yield self.store.have_events(event_ids)).keys() - ) + seen_ids = yield self.store.have_seen_events(event_ids) if state and auth_chain is not None: # If we have any state or auth_chain given to us by the replication @@ -633,7 +625,7 @@ class FederationHandler(BaseHandler): failed_to_fetch = missing_auth - set(auth_events) - seen_events = yield self.store.have_events( + seen_events = yield self.store.have_seen_events( set(auth_events.keys()) | set(state_events.keys()) ) @@ -1736,7 +1728,8 @@ class FederationHandler(BaseHandler): event_key = None if event_auth_events - current_state: - have_events = yield self.store.have_events( + # TODO: can we use store.have_seen_events here instead? + have_events = yield self.store.get_seen_events_with_rejections( event_auth_events - current_state ) else: @@ -1759,12 +1752,12 @@ class FederationHandler(BaseHandler): origin, event.room_id, event.event_id ) - seen_remotes = yield self.store.have_events( + seen_remotes = yield self.store.have_seen_events( [e.event_id for e in remote_auth_chain] ) for e in remote_auth_chain: - if e.event_id in seen_remotes.keys(): + if e.event_id in seen_remotes: continue if e.event_id == event.event_id: @@ -1791,7 +1784,7 @@ class FederationHandler(BaseHandler): except AuthError: pass - have_events = yield self.store.have_events( + have_events = yield self.store.get_seen_events_with_rejections( [e_id for e_id, _ in event.auth_events] ) seen_events = set(have_events.keys()) @@ -1876,13 +1869,13 @@ class FederationHandler(BaseHandler): local_auth_chain, ) - seen_remotes = yield self.store.have_events( + seen_remotes = yield self.store.have_seen_events( [e.event_id for e in result["auth_chain"]] ) # 3. Process any remote auth chain events we haven't seen. for ev in result["auth_chain"]: - if ev.event_id in seen_remotes.keys(): + if ev.event_id in seen_remotes: continue if ev.event_id == event.event_id: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index da44b52fd6..5fe4a0e56c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -16,6 +16,7 @@ from collections import OrderedDict, deque, namedtuple from functools import wraps +import itertools import logging import simplejson as json @@ -1320,13 +1321,49 @@ class EventsStore(EventsWorkerStore): defer.returnValue(set(r["event_id"] for r in rows)) - def have_events(self, event_ids): + @defer.inlineCallbacks + def have_seen_events(self, event_ids): """Given a list of event ids, check if we have already processed them. + Args: + event_ids (iterable[str]): + Returns: - dict: Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps to - None. + Deferred[set[str]]: The events we have already seen. + """ + results = set() + + def have_seen_events_txn(txn, chunk): + sql = ( + "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" + % (",".join("?" * len(chunk)), ) + ) + txn.execute(sql, chunk) + for (event_id, ) in txn: + results.add(event_id) + + # break the input up into chunks of 100 + input_iterator = iter(event_ids) + for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), + []): + yield self.runInteraction( + "have_seen_events", + have_seen_events_txn, + chunk, + ) + defer.returnValue(results) + + def get_seen_events_with_rejections(self, event_ids): + """Given a list of event ids, check if we rejected them. + + Args: + event_ids (list[str]) + + Returns: + Deferred[dict[str, str|None): + Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps + to None. """ if not event_ids: return defer.succeed({}) @@ -1348,9 +1385,7 @@ class EventsStore(EventsWorkerStore): return res - return self.runInteraction( - "have_events", f, - ) + return self.runInteraction("get_rejection_reasons", f) @defer.inlineCallbacks def count_daily_messages(self): -- cgit 1.5.1 From ba3166743c50cb85c9ab2d35eb31968f892260bc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 Apr 2018 15:11:18 +0100 Subject: Fix quarantine media admin API --- synapse/storage/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 740c036975..c0c01bcfbe 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore): while next_token: sql = """ SELECT stream_ordering, json FROM events - JOIN event_json USING (event_id) + JOIN event_json USING (room_id, event_id) WHERE room_id = ? AND stream_ordering < ? AND contains_url = ? AND outlier = ? -- cgit 1.5.1 From 22881b3d692f44696189a1e0eeb6e83999dae10c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 Apr 2018 15:32:04 +0100 Subject: Also fix reindexing of search --- synapse/storage/search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 426cbe6e1a..6ba3e59889 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore): sql = ( "SELECT stream_ordering, event_id, room_id, type, json, " " origin_server_ts FROM events" - " JOIN event_json USING (event_id)" + " JOIN event_json USING (room_id, event_id)" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" -- cgit 1.5.1 From 7ec8e798b4bc67b7365be3fc743e236b9f3c76ae Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Apr 2018 11:31:22 +0100 Subject: Fix media admin APIs --- synapse/storage/room.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/room.py b/synapse/storage/room.py index c0c01bcfbe..ea6a189185 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore): # Convert the IDs to MXC URIs for media_id in local_mxcs: - local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id)) + local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id)) for hostname, media_id in remote_mxcs: remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id)) @@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore): if matches: hostname = matches.group(1) media_id = matches.group(2) - if hostname == self.hostname: + if hostname == self.hs.hostname: local_media_mxcs.append(media_id) else: remote_media_mxcs.append((hostname, media_id)) -- cgit 1.5.1