From 605defb9e4c274d61d9da86546e6fd6c78f3f6cf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 11:16:28 +0100 Subject: Add missing consumeErrors In general we want defer.gatherResults to consumeErrors, rather than having exceptions hanging around and getting logged as CRITICAL unhandled errors. --- synapse/storage/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 2956c3b3e0..3b8b539993 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -202,7 +202,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids - ])) + ], consumeErrors=True)) results.update(dict(zip(rm_ids, res))) defer.returnValue(results) -- cgit 1.4.1 From 2a13af23bc0561ab48e0a90528231c40ee209724 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 11:29:27 +0100 Subject: Use run_in_background in preference to preserve_fn While I was going through uses of preserve_fn for other PRs, I converted places which only use the wrapped function once to use run_in_background, to avoid creating the function object. --- synapse/app/federation_sender.py | 4 ++-- synapse/app/pusher.py | 4 ++-- synapse/app/synchrotron.py | 5 ++--- synapse/appservice/scheduler.py | 12 ++++++------ synapse/crypto/keyring.py | 28 ++++++++++++++++----------- synapse/federation/federation_client.py | 5 +++-- synapse/groups/attestations.py | 4 ++-- synapse/handlers/appservice.py | 5 ++++- synapse/handlers/e2e_keys.py | 6 +++--- synapse/handlers/federation.py | 16 +++++++++------ synapse/handlers/initial_sync.py | 12 +++++++----- synapse/handlers/message.py | 5 +++-- synapse/handlers/typing.py | 7 ++++--- synapse/push/pusherpool.py | 20 +++++++++++-------- synapse/rest/media/v1/preview_url_resource.py | 5 +++-- synapse/storage/events_worker.py | 5 +++-- synapse/storage/stream.py | 5 +++-- synapse/util/async.py | 4 ++-- synapse/util/file_consumer.py | 6 ++++-- synapse/util/logcontext.py | 2 +- synapse/util/ratelimitutils.py | 4 ++-- synapse/util/retryutils.py | 4 ++-- 22 files changed, 97 insertions(+), 71 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 0cc3331519..c6daa0d43f 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -38,7 +38,7 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor @@ -229,7 +229,7 @@ class FederationSenderHandler(object): # presence, typing, etc. if stream_name == "federation": send_queue.process_rows_for_federation(self.federation_sender, rows) - preserve_fn(self.update_token)(token) + run_in_background(self.update_token, token) # We also need to poke the federation sender when new events happen elif stream_name == "events": diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d5c3a85195..8bd5c0c2b7 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -33,7 +33,7 @@ from synapse.server import HomeServer from synapse.storage import DataStore from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor @@ -140,7 +140,7 @@ class PusherReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) - preserve_fn(self.poke_pushers)(stream_name, token, rows) + run_in_background(self.poke_pushers, stream_name, token, rows) @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 2fddcd935a..0c4ccc58bc 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -51,7 +51,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string @@ -327,8 +327,7 @@ class SyncReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) - - preserve_fn(self.process_and_notify)(stream_name, token, rows) + run_in_background(self.process_and_notify, stream_name, token, rows) def get_streams_to_replicate(self): args = super(SyncReplicationHandler, self).get_streams_to_replicate() diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 6da315473d..ba1631b5c8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -51,7 +51,7 @@ components. from twisted.internet import defer from synapse.appservice import ApplicationServiceState -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure import logging @@ -106,7 +106,7 @@ class _ServiceQueuer(object): def enqueue(self, service, event): # if this service isn't being sent something self.queued_events.setdefault(service.id, []).append(event) - preserve_fn(self._send_request)(service) + run_in_background(self._send_request, service) @defer.inlineCallbacks def _send_request(self, service): @@ -152,10 +152,10 @@ class _TransactionController(object): if sent: yield txn.complete(self.store) else: - preserve_fn(self._start_recoverer)(service) - except Exception as e: - logger.exception(e) - preserve_fn(self._start_recoverer)(service) + run_in_background(self._start_recoverer, service) + except Exception: + logger.exception("Error creating appservice transaction") + run_in_background(self._start_recoverer, service) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index fce83d445f..38944a7326 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -19,7 +19,8 @@ from synapse.api.errors import SynapseError, Codes from synapse.util import unwrapFirstError, logcontext from synapse.util.logcontext import ( PreserveLoggingContext, - preserve_fn + preserve_fn, + run_in_background, ) from synapse.util.metrics import Measure @@ -127,7 +128,7 @@ class Keyring(object): verify_requests.append(verify_request) - preserve_fn(self._start_key_lookups)(verify_requests) + run_in_background(self._start_key_lookups, verify_requests) # Pass those keys to handle_key_deferred so that the json object # signatures can be verified @@ -313,7 +314,7 @@ class Keyring(object): if not verify_request.deferred.called: verify_request.deferred.errback(err) - preserve_fn(do_iterations)().addErrback(on_err) + run_in_background(do_iterations).addErrback(on_err) @defer.inlineCallbacks def get_keys_from_store(self, server_name_and_key_ids): @@ -329,8 +330,9 @@ class Keyring(object): """ res = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.get_server_verify_keys)( - server_name, key_ids + run_in_background( + self.store.get_server_verify_keys, + server_name, key_ids, ).addCallback(lambda ks, server: (server, ks), server_name) for server_name, key_ids in server_name_and_key_ids ], @@ -358,7 +360,7 @@ class Keyring(object): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(get_key)(p_name, p_keys) + run_in_background(get_key, p_name, p_keys) for p_name, p_keys in self.perspective_servers.items() ], consumeErrors=True, @@ -398,7 +400,7 @@ class Keyring(object): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(get_key)(server_name, key_ids) + run_in_background(get_key, server_name, key_ids) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, @@ -481,7 +483,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store_keys)( + run_in_background( + self.store_keys, server_name=server_name, from_server=perspective_name, verify_keys=response_keys, @@ -539,7 +542,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store_keys)( + run_in_background( + self.store_keys, server_name=key_server_name, from_server=server_name, verify_keys=verify_keys, @@ -615,7 +619,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.store_server_keys_json)( + run_in_background( + self.store.store_server_keys_json, server_name=server_name, key_id=key_id, from_server=server_name, @@ -716,7 +721,8 @@ class Keyring(object): # TODO(markjh): Store whether the keys have expired. return logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.store_server_verify_key)( + run_in_background( + self.store.store_server_verify_key, server_name, server_name, key.time_added, key ) for key_id, key in verify_keys.items() diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 8e2c0c4cd2..8adc60863e 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -33,7 +33,7 @@ from synapse.federation.federation_base import ( import synapse.metrics from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination @@ -417,7 +417,8 @@ class FederationClient(FederationBase): batch = set(missing_events[i:i + batch_size]) deferreds = [ - preserve_fn(self.get_pdu)( + run_in_background( + self.get_pdu, destinations=random_server_list(), event_id=e_id, ) diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 1fb709e6c3..5f53f17954 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -42,7 +42,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.types import get_domain_from_id -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from signedjson.sign import sign_json @@ -192,4 +192,4 @@ class GroupAttestionRenewer(object): group_id = row["group_id"] user_id = row["user_id"] - preserve_fn(_renew_attestation)(group_id, user_id) + run_in_background(_renew_attestation, group_id, user_id) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 0245197c02..6cc2388306 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -198,7 +198,10 @@ class ApplicationServicesHandler(object): services = yield self._get_services_for_3pn(protocol) results = yield make_deferred_yieldable(defer.DeferredList([ - preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) + run_in_background( + self.appservice_api.query_3pe, + service, kind, protocol, fields, + ) for service in services ], consumeErrors=True)) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 325c0c4a9f..fc958404a1 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -24,7 +24,7 @@ from synapse.api.errors import ( SynapseError, CodeMessageException, FederationDeniedError, ) from synapse.types import get_domain_from_id, UserID -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -139,7 +139,7 @@ class E2eKeysHandler(object): failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(do_remote_query)(destination) + run_in_background(do_remote_query, destination) for destination in remote_queries_not_in_cache ])) @@ -242,7 +242,7 @@ class E2eKeysHandler(object): failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(claim_client_keys)(destination) + run_in_background(claim_client_keys, destination) for destination in remote_queries ])) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ae7e0d6da2..c66ca0f381 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -637,7 +637,8 @@ class FederationHandler(BaseHandler): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(self.replication_layer.get_pdu)( + logcontext.run_in_background( + self.replication_layer.get_pdu, [dest], event_id, outlier=True, @@ -1023,7 +1024,7 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - logcontext.preserve_fn(self._handle_queued_pdus)(room_queue) + logcontext.run_in_background(self._handle_queued_pdus, room_queue) defer.returnValue(True) @@ -1523,8 +1524,9 @@ class FederationHandler(BaseHandler): if not backfilled: # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - logcontext.preserve_fn(self.pusher_pool.on_new_notifications)( - event_stream_id, max_stream_id + logcontext.run_in_background( + self.pusher_pool.on_new_notifications, + event_stream_id, max_stream_id, ) defer.returnValue((context, event_stream_id, max_stream_id)) @@ -1538,7 +1540,8 @@ class FederationHandler(BaseHandler): """ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(self._prep_event)( + logcontext.run_in_background( + self._prep_event, origin, ev_info["event"], state=ev_info.get("state"), @@ -1867,7 +1870,8 @@ class FederationHandler(BaseHandler): different_events = yield logcontext.make_deferred_yieldable( defer.gatherResults([ - logcontext.preserve_fn(self.store.get_event)( + logcontext.run_in_background( + self.store.get_event, d, allow_none=True, allow_rejected=False, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index c5267b4b84..cd33a86599 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -27,7 +27,7 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -166,7 +166,8 @@ class InitialSyncHandler(BaseHandler): (messages, token), current_state = yield make_deferred_yieldable( defer.gatherResults( [ - preserve_fn(self.store.get_recent_events_for_room)( + run_in_background( + self.store.get_recent_events_for_room, event.room_id, limit=limit, end_token=room_end_token, @@ -391,9 +392,10 @@ class InitialSyncHandler(BaseHandler): presence, receipts, (messages, token) = yield defer.gatherResults( [ - preserve_fn(get_presence)(), - preserve_fn(get_receipts)(), - preserve_fn(self.store.get_recent_events_for_room)( + run_in_background(get_presence), + run_in_background(get_receipts), + run_in_background( + self.store.get_recent_events_for_room, room_id, limit=limit, end_token=now_token.room_key, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21628a8540..244b98dd8d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -850,7 +850,8 @@ class EventCreationHandler(object): # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.pusher_pool.on_new_notifications)( + run_in_background( + self.pusher_pool.on_new_notifications, event_stream_id, max_stream_id ) @@ -862,7 +863,7 @@ class EventCreationHandler(object): extra_users=extra_users ) - preserve_fn(_notify)() + run_in_background(_notify) if event.type == EventTypes.Message: presence = self.hs.get_presence_handler() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 77c0cf146f..19cde70adf 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer from synapse.types import UserID, get_domain_from_id @@ -97,7 +97,8 @@ class TypingHandler(object): if self.hs.is_mine_id(member.user_id): last_fed_poke = self._member_last_federation_poke.get(member, None) if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: - preserve_fn(self._push_remote)( + run_in_background( + self._push_remote, member=member, typing=True ) @@ -196,7 +197,7 @@ class TypingHandler(object): def _push_update(self, member, typing): if self.hs.is_mine_id(member.user_id): # Only send updates for changes to our own users. - preserve_fn(self._push_remote)(member, typing) + run_in_background(self._push_remote, member, typing) self._push_update_local( member=member, diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 134e89b371..7bb5733090 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -14,13 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer -from .pusher import PusherFactory -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.push.pusher import PusherFactory from synapse.util.async import run_on_reactor - -import logging +from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) @@ -137,8 +137,9 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - preserve_fn(p.on_new_notifications)( - min_stream_id, max_stream_id + run_in_background( + p.on_new_notifications, + min_stream_id, max_stream_id, ) ) @@ -164,7 +165,10 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) + run_in_background( + p.on_new_receipts, + min_stream_id, max_stream_id, + ) ) yield make_deferred_yieldable(defer.gatherResults(deferreds)) @@ -207,7 +211,7 @@ class PusherPool: if appid_pushkey in byuser: byuser[appid_pushkey].on_stop() byuser[appid_pushkey] = p - preserve_fn(p.on_started)() + run_in_background(p.on_started) logger.info("Started pushers") diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 0fc21540c6..9290d7946f 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -35,7 +35,7 @@ from ._base import FileInfo from synapse.api.errors import ( SynapseError, Codes, ) -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.stringutils import random_string from synapse.util.caches.expiringcache import ExpiringCache from synapse.http.client import SpiderHttpClient @@ -144,7 +144,8 @@ class PreviewUrlResource(Resource): observable = self._cache.get(url) if not observable: - download = preserve_fn(self._do_preview)( + download = run_in_background( + self._do_preview, url, requester.user, ts, ) observable = ObservableDeferred( diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index a937b9bceb..ba834854e1 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -20,7 +20,7 @@ from synapse.events import FrozenEvent from synapse.events.utils import prune_event from synapse.util.logcontext import ( - preserve_fn, PreserveLoggingContext, make_deferred_yieldable + PreserveLoggingContext, make_deferred_yieldable, run_in_background, ) from synapse.util.metrics import Measure from synapse.api.errors import SynapseError @@ -319,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore): res = yield make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self._get_event_from_row)( + run_in_background( + self._get_event_from_row, row["internal_metadata"], row["json"], row["redacts"], rejected_reason=row["rejects"], ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 2956c3b3e0..5b245a936c 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -41,7 +41,7 @@ from synapse.storage.events import EventsWorkerStore from synapse.util.caches.descriptors import cached from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.storage.engines import PostgresEngine, Sqlite3Engine import abc @@ -198,7 +198,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): res = yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(self.get_room_events_stream_for_room)( + run_in_background( + self.get_room_events_stream_for_room, room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids diff --git a/synapse/util/async.py b/synapse/util/async.py index 0729bb2863..bd07067328 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -17,7 +17,7 @@ from twisted.internet import defer, reactor from .logcontext import ( - PreserveLoggingContext, make_deferred_yieldable, preserve_fn + PreserveLoggingContext, make_deferred_yieldable, run_in_background ) from synapse.util import logcontext, unwrapFirstError @@ -161,7 +161,7 @@ def concurrently_execute(func, args, limit): pass return logcontext.make_deferred_yieldable(defer.gatherResults([ - preserve_fn(_concurrently_execute_inner)() + run_in_background(_concurrently_execute_inner) for _ in xrange(limit) ], consumeErrors=True)).addErrback(unwrapFirstError) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 3c8a165331..3380970e4e 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -15,7 +15,7 @@ from twisted.internet import threads, reactor -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from six.moves import queue @@ -70,7 +70,9 @@ class BackgroundFileConsumer(object): self._producer = producer self.streaming = streaming - self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer) + self._finished_deferred = run_in_background( + threads.deferToThread, self._writer + ) if not streaming: self._producer.resumeProducing() diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d59adc236e..c2edf87e58 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -341,7 +341,7 @@ def make_deferred_yieldable(deferred): returning a deferred. Then, when the deferred completes, restores the current logcontext before running callbacks/errbacks. - (This is more-or-less the opposite operation to preserve_fn.) + (This is more-or-less the opposite operation to run_in_background.) """ if isinstance(deferred, defer.Deferred) and not deferred.called: prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 1101881a2d..18424f6c36 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError from synapse.util.async import sleep -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background import collections import contextlib @@ -150,7 +150,7 @@ class _PerHostRatelimiter(object): "Ratelimit [%s]: sleeping req", id(request_id), ) - ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0) + ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0) self.sleeping_requests.add(request_id) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 47b0bb5eb3..4e93f69d3a 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -203,8 +203,8 @@ class RetryDestinationLimiter(object): ) except Exception: logger.exception( - "Failed to store set_destination_retry_timings", + "Failed to store destination_retry_timings", ) # we deliberately do this in the background. - synapse.util.logcontext.preserve_fn(store_retry_timings)() + synapse.util.logcontext.run_in_background(store_retry_timings) -- cgit 1.4.1 From d82b6ea9e68150aece1fc46cb0821b31cf728910 Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sat, 28 Apr 2018 13:57:00 +0200 Subject: Move more xrange to six plus a bonus next() Signed-off-by: Adrian Tschira --- synapse/federation/federation_client.py | 4 +++- synapse/handlers/room_list.py | 4 +++- synapse/storage/registration.py | 4 +++- synapse/storage/schema/delta/30/as_users.py | 4 +++- synapse/storage/stream.py | 4 +++- synapse/storage/tags.py | 4 +++- synapse/util/async.py | 6 ++++-- synapse/util/stringutils.py | 5 +++-- synapse/util/wheel_timer.py | 4 +++- 9 files changed, 28 insertions(+), 11 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 8e2c0c4cd2..a2067fc390 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,6 +19,8 @@ import itertools import logging import random +from six.moves import range + from twisted.internet import defer from synapse.api.constants import Membership @@ -413,7 +415,7 @@ class FederationClient(FederationBase): batch_size = 20 missing_events = list(missing_events) - for i in xrange(0, len(missing_events), batch_size): + for i in range(0, len(missing_events), batch_size): batch = set(missing_events[i:i + batch_size]) deferreds = [ diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index add3f9b009..5757bb7f8a 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -15,6 +15,8 @@ from twisted.internet import defer +from six.moves import range + from ._base import BaseHandler from synapse.api.constants import ( @@ -200,7 +202,7 @@ class RoomListHandler(BaseHandler): step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1 chunk = [] - for i in xrange(0, len(rooms_to_scan), step): + for i in range(0, len(rooms_to_scan), step): batch = rooms_to_scan[i:i + step] logger.info("Processing %i rooms for result", len(batch)) yield concurrently_execute( diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 6b557ca0cf..a50717db2d 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -22,6 +22,8 @@ from synapse.storage import background_updates from synapse.storage._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from six.moves import range + class RegistrationWorkerStore(SQLBaseStore): @cached() @@ -469,7 +471,7 @@ class RegistrationStore(RegistrationWorkerStore, match = regex.search(user_id) if match: found.add(int(match.group(1))) - for i in xrange(len(found) + 1): + for i in range(len(found) + 1): if i not in found: return i diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py index c53e53c94f..85bd1a2006 100644 --- a/synapse/storage/schema/delta/30/as_users.py +++ b/synapse/storage/schema/delta/30/as_users.py @@ -14,6 +14,8 @@ import logging from synapse.config.appservice import load_appservices +from six.moves import range + logger = logging.getLogger(__name__) @@ -58,7 +60,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): for as_id, user_ids in owned.items(): n = 100 - user_chunks = (user_ids[i:i + 100] for i in xrange(0, len(user_ids), n)) + user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n)) for chunk in user_chunks: cur.execute( database_engine.convert_param_style( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3b8b539993..52c90d8e04 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -47,6 +47,8 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine import abc import logging +from six.moves import range + logger = logging.getLogger(__name__) @@ -196,7 +198,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): results = {} room_ids = list(room_ids) - for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): + for rm_ids in (room_ids[i:i + 20] for i in range(0, len(room_ids), 20)): res = yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(self.get_room_events_stream_for_room)( room_id, from_key, to_key, limit, order=order, diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 13bff9f055..6671d3cfca 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -22,6 +22,8 @@ from twisted.internet import defer import simplejson as json import logging +from six.moves import range + logger = logging.getLogger(__name__) @@ -98,7 +100,7 @@ class TagsWorkerStore(AccountDataWorkerStore): batch_size = 50 results = [] - for i in xrange(0, len(tag_ids), batch_size): + for i in range(0, len(tag_ids), batch_size): tags = yield self.runInteraction( "get_all_updated_tag_content", get_tag_content, diff --git a/synapse/util/async.py b/synapse/util/async.py index 1df5c5600c..cb53c31123 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -27,6 +27,8 @@ from contextlib import contextmanager import logging +from six.moves import range + logger = logging.getLogger(__name__) @@ -158,13 +160,13 @@ def concurrently_execute(func, args, limit): def _concurrently_execute_inner(): try: while True: - yield func(it.next()) + yield func(next(it)) except StopIteration: pass return logcontext.make_deferred_yieldable(defer.gatherResults([ preserve_fn(_concurrently_execute_inner)() - for _ in xrange(limit) + for _ in range(limit) ], consumeErrors=True)).addErrback(unwrapFirstError) diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py index 95a6168e16..b98b9dc6e4 100644 --- a/synapse/util/stringutils.py +++ b/synapse/util/stringutils.py @@ -15,6 +15,7 @@ import random import string +from six.moves import range _string_with_symbols = ( string.digits + string.ascii_letters + ".,;:^&*-_+=#~@" @@ -22,12 +23,12 @@ _string_with_symbols = ( def random_string(length): - return ''.join(random.choice(string.ascii_letters) for _ in xrange(length)) + return ''.join(random.choice(string.ascii_letters) for _ in range(length)) def random_string_with_symbols(length): return ''.join( - random.choice(_string_with_symbols) for _ in xrange(length) + random.choice(_string_with_symbols) for _ in range(length) ) diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py index b70f9a6b0a..7a9e45aca9 100644 --- a/synapse/util/wheel_timer.py +++ b/synapse/util/wheel_timer.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from six.moves import range + class _Entry(object): __slots__ = ["end_key", "queue"] @@ -68,7 +70,7 @@ class WheelTimer(object): # Add empty entries between the end of the current list and when we want # to insert. This ensures there are no gaps. self.entries.extend( - _Entry(key) for key in xrange(last_key, then_key + 1) + _Entry(key) for key in range(last_key, then_key + 1) ) self.entries[-1].queue.append(obj) -- cgit 1.4.1 From 06c0d0ed081b56716135c4881e600861b2e8cad5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 May 2018 15:45:38 +0100 Subject: Split paginate_room_events storage function --- synapse/storage/stream.py | 100 +++++++++++++++++++++++++++++++++------------- 1 file changed, 72 insertions(+), 28 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index f0784ba137..b57a8a7ef6 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -738,17 +738,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def has_room_changed_since(self, room_id, stream_id): return self._events_stream_cache.has_entity_changed(room_id, stream_id) + def paginate_room_events_txn(self, txn, room_id, from_key, to_key=None, + direction='b', limit=-1, event_filter=None): + """Returns list of events before or after a given token. -class StreamStore(StreamWorkerStore): - def get_room_max_stream_ordering(self): - return self._stream_id_gen.get_current_token() - - def get_room_min_stream_ordering(self): - return self._backfill_id_gen.get_current_token() + Args: + txn + room_id (str) + from_key (str): The token used to stream from + to_key (str|None): A token which if given limits the results to + only those before + direction(char): Either 'b' or 'f' to indicate whether we are + paginating forwards or backwards from `from_key`. + limit (int): The maximum number of events to return. Zero or less + means no limit. + event_filter (Filter|None): If provided filters the events to + those that match the filter. - @defer.inlineCallbacks - def paginate_room_events(self, room_id, from_key, to_key=None, - direction='b', limit=-1, event_filter=None): + Returns: + tuple[list[dict], str]: Returns the results as a list of dicts and + a token that points to the end of the result set. The dicts have + the keys "event_id", "toplogical_ordering" and "stream_orderign". + """ # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. @@ -795,29 +806,54 @@ class StreamStore(StreamWorkerStore): "limit": limit_str } - def f(txn): - txn.execute(sql, args) + txn.execute(sql, args) - rows = self.cursor_to_dict(txn) + rows = self.cursor_to_dict(txn) - if rows: - topo = rows[-1]["topological_ordering"] - toke = rows[-1]["stream_ordering"] - if direction == 'b': - # Tokens are positions between events. - # This token points *after* the last event in the chunk. - # We need it to point to the event before it in the chunk - # when we are going backwards so we subtract one from the - # stream part. - toke -= 1 - next_token = str(RoomStreamToken(topo, toke)) - else: - # TODO (erikj): We should work out what to do here instead. - next_token = to_key if to_key else from_key + if rows: + topo = rows[-1]["topological_ordering"] + toke = rows[-1]["stream_ordering"] + if direction == 'b': + # Tokens are positions between events. + # This token points *after* the last event in the chunk. + # We need it to point to the event before it in the chunk + # when we are going backwards so we subtract one from the + # stream part. + toke -= 1 + next_token = str(RoomStreamToken(topo, toke)) + else: + # TODO (erikj): We should work out what to do here instead. + next_token = to_key if to_key else from_key + + return rows, next_token, - return rows, next_token, + @defer.inlineCallbacks + def paginate_room_events(self, room_id, from_key, to_key=None, + direction='b', limit=-1, event_filter=None): + """Returns list of events before or after a given token. - rows, token = yield self.runInteraction("paginate_room_events", f) + Args: + room_id (str) + from_key (str): The token used to stream from + to_key (str|None): A token which if given limits the results to + only those before + direction(char): Either 'b' or 'f' to indicate whether we are + paginating forwards or backwards from `from_key`. + limit (int): The maximum number of events to return. Zero or less + means no limit. + event_filter (Filter|None): If provided filters the events to + those that match the filter. + + Returns: + tuple[list[dict], str]: Returns the results as a list of dicts and + a token that points to the end of the result set. The dicts have + the keys "event_id", "toplogical_ordering" and "stream_orderign". + """ + + rows, token = yield self.runInteraction( + "paginate_room_events", self.paginate_room_events_txn, + room_id, from_key, to_key, direction, limit, event_filter, + ) events = yield self._get_events( [r["event_id"] for r in rows], @@ -827,3 +863,11 @@ class StreamStore(StreamWorkerStore): self._set_before_and_after(events, rows) defer.returnValue((events, token)) + + +class StreamStore(StreamWorkerStore): + def get_room_max_stream_ordering(self): + return self._stream_id_gen.get_current_token() + + def get_room_min_stream_ordering(self): + return self._backfill_id_gen.get_current_token() -- cgit 1.4.1 From 274b8c6025e15eede0137bcb6a73ac00bf370ee8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 May 2018 16:15:07 +0100 Subject: Only fetch required fields from database --- synapse/storage/stream.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b57a8a7ef6..d3adb0bf37 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -796,7 +796,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): limit_str = "" sql = ( - "SELECT * FROM events" + "SELECT event_id, topological_ordering, stream_ordering" + " FROM events" " WHERE outlier = ? AND room_id = ? AND %(bounds)s" " ORDER BY topological_ordering %(order)s," " stream_ordering %(order)s %(limit)s" -- cgit 1.4.1 From 3e6d306e94327983b1f6b66cec9faace642d3f16 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 May 2018 16:18:58 +0100 Subject: Parse tokens before calling DB function --- synapse/storage/stream.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index d3adb0bf37..ce98587608 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -738,16 +738,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def has_room_changed_since(self, room_id, stream_id): return self._events_stream_cache.has_entity_changed(room_id, stream_id) - def paginate_room_events_txn(self, txn, room_id, from_key, to_key=None, + def paginate_room_events_txn(self, txn, room_id, from_token, to_token=None, direction='b', limit=-1, event_filter=None): """Returns list of events before or after a given token. Args: txn room_id (str) - from_key (str): The token used to stream from - to_key (str|None): A token which if given limits the results to - only those before + from_token (RoomStreamToken): The token used to stream from + to_token (RoomStreamToken|None): A token which if given limits the + results to only those before direction(char): Either 'b' or 'f' to indicate whether we are paginating forwards or backwards from `from_key`. limit (int): The maximum number of events to return. Zero or less @@ -757,7 +757,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Returns: tuple[list[dict], str]: Returns the results as a list of dicts and - a token that points to the end of the result set. The dicts have + a token that points to the end of the result set. The dicts haveq the keys "event_id", "toplogical_ordering" and "stream_orderign". """ # Tokens really represent positions between elements, but we use @@ -767,20 +767,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if direction == 'b': order = "DESC" bounds = upper_bound( - RoomStreamToken.parse(from_key), self.database_engine + from_token, self.database_engine ) - if to_key: + if to_token: bounds = "%s AND %s" % (bounds, lower_bound( - RoomStreamToken.parse(to_key), self.database_engine + to_token, self.database_engine )) else: order = "ASC" bounds = lower_bound( - RoomStreamToken.parse(from_key), self.database_engine + from_token, self.database_engine ) - if to_key: + if to_token: bounds = "%s AND %s" % (bounds, upper_bound( - RoomStreamToken.parse(to_key), self.database_engine + to_token, self.database_engine )) filter_clause, filter_args = filter_to_clause(event_filter) @@ -821,12 +821,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # when we are going backwards so we subtract one from the # stream part. toke -= 1 - next_token = str(RoomStreamToken(topo, toke)) + next_token = RoomStreamToken(topo, toke) else: # TODO (erikj): We should work out what to do here instead. - next_token = to_key if to_key else from_key + next_token = to_token if to_token else from_token - return rows, next_token, + return rows, str(next_token), @defer.inlineCallbacks def paginate_room_events(self, room_id, from_key, to_key=None, @@ -851,6 +851,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): the keys "event_id", "toplogical_ordering" and "stream_orderign". """ + from_key = RoomStreamToken.parse(from_key) + if to_key: + to_key = RoomStreamToken.parse(to_key) + rows, token = yield self.runInteraction( "paginate_room_events", self.paginate_room_events_txn, room_id, from_key, to_key, direction, limit, event_filter, -- cgit 1.4.1 From 696f5324539b014a5a9613c3a534abb2b3725dac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 May 2018 16:19:33 +0100 Subject: Reuse existing pagination code for context API --- synapse/storage/stream.py | 90 ++++++++--------------------------------------- 1 file changed, 15 insertions(+), 75 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index ce98587608..938a8809dc 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -42,7 +42,7 @@ from synapse.util.caches.descriptors import cached from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background -from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.storage.engines import PostgresEngine import abc import logging @@ -595,88 +595,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): retcols=["stream_ordering", "topological_ordering"], ) - token = RoomStreamToken( - results["topological_ordering"], + # Paginating backwards includes the event at the token, but paginating + # forward doesn't. + before_token = RoomStreamToken( + results["topological_ordering"] - 1, results["stream_ordering"], ) - if isinstance(self.database_engine, Sqlite3Engine): - # SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)`` - # So we give pass it to SQLite3 as the UNION ALL of the two queries. - - query_before = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND topological_ordering < ?" - " UNION ALL" - " SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?" - " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" - ) - before_args = ( - room_id, token.topological, - room_id, token.topological, token.stream, - before_limit, - ) - - query_after = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND topological_ordering > ?" - " UNION ALL" - " SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?" - " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" - ) - after_args = ( - room_id, token.topological, - room_id, token.topological, token.stream, - after_limit, - ) - else: - query_before = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND %s" - " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" - ) % (upper_bound(token, self.database_engine, inclusive=False),) - - before_args = (room_id, before_limit) - - query_after = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND %s" - " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" - ) % (lower_bound(token, self.database_engine, inclusive=False),) - - after_args = (room_id, after_limit) - - txn.execute(query_before, before_args) + after_token = RoomStreamToken( + results["topological_ordering"], + results["stream_ordering"], + ) - rows = self.cursor_to_dict(txn) + rows, start_token = self.paginate_room_events_txn( + txn, room_id, before_token, direction='b', limit=before_limit, + ) events_before = [r["event_id"] for r in rows] - if rows: - start_token = str(RoomStreamToken( - rows[0]["topological_ordering"], - rows[0]["stream_ordering"] - 1, - )) - else: - start_token = str(RoomStreamToken( - token.topological, - token.stream - 1, - )) - - txn.execute(query_after, after_args) - - rows = self.cursor_to_dict(txn) + rows, end_token = self.paginate_room_events_txn( + txn, room_id, after_token, direction='f', limit=after_limit, + ) events_after = [r["event_id"] for r in rows] - if rows: - end_token = str(RoomStreamToken( - rows[-1]["topological_ordering"], - rows[-1]["stream_ordering"], - )) - else: - end_token = str(token) - return { "before": { "event_ids": events_before, -- cgit 1.4.1 From 23ec51c94ce90fd70931fbe98cadb7dd51a9db4e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 09:55:19 +0100 Subject: Fix up comments and make function private --- synapse/storage/stream.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 938a8809dc..54be025401 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -607,12 +607,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): results["stream_ordering"], ) - rows, start_token = self.paginate_room_events_txn( + rows, start_token = self._paginate_room_events_txn( txn, room_id, before_token, direction='b', limit=before_limit, ) events_before = [r["event_id"] for r in rows] - rows, end_token = self.paginate_room_events_txn( + rows, end_token = self._paginate_room_events_txn( txn, room_id, after_token, direction='f', limit=after_limit, ) events_after = [r["event_id"] for r in rows] @@ -678,8 +678,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def has_room_changed_since(self, room_id, stream_id): return self._events_stream_cache.has_entity_changed(room_id, stream_id) - def paginate_room_events_txn(self, txn, room_id, from_token, to_token=None, - direction='b', limit=-1, event_filter=None): + def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None, + direction='b', limit=-1, event_filter=None): """Returns list of events before or after a given token. Args: @@ -697,8 +697,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Returns: tuple[list[dict], str]: Returns the results as a list of dicts and - a token that points to the end of the result set. The dicts haveq - the keys "event_id", "toplogical_ordering" and "stream_orderign". + a token that points to the end of the result set. The dicts have + the keys "event_id", "toplogical_ordering" and "stream_ordering". """ # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence @@ -796,7 +796,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): to_key = RoomStreamToken.parse(to_key) rows, token = yield self.runInteraction( - "paginate_room_events", self.paginate_room_events_txn, + "paginate_room_events", self._paginate_room_events_txn, room_id, from_key, to_key, direction, limit, event_filter, ) -- cgit 1.4.1 From 27cf170558865d0afeea5d4c2524e8664c28323b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 10:53:29 +0100 Subject: Refactor recent events func to use pagination func This also removes a cache that is unlikely to ever get hit. --- synapse/storage/stream.py | 75 +++++++++++++++++------------------------------ 1 file changed, 27 insertions(+), 48 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 54be025401..da43bb1321 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -38,7 +38,6 @@ from twisted.internet import defer from synapse.storage._base import SQLBaseStore from synapse.storage.events import EventsWorkerStore -from synapse.util.caches.descriptors import cached from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background @@ -363,60 +362,40 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): defer.returnValue((events, token)) - @cached(num_args=4) + @defer.inlineCallbacks def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None): - end_token = RoomStreamToken.parse_stream_token(end_token) - - if from_token is None: - sql = ( - "SELECT stream_ordering, topological_ordering, event_id" - " FROM events" - " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?" - " ORDER BY topological_ordering DESC, stream_ordering DESC" - " LIMIT ?" - ) - else: - from_token = RoomStreamToken.parse_stream_token(from_token) - sql = ( - "SELECT stream_ordering, topological_ordering, event_id" - " FROM events" - " WHERE room_id = ? AND stream_ordering > ?" - " AND stream_ordering <= ? AND outlier = ?" - " ORDER BY topological_ordering DESC, stream_ordering DESC" - " LIMIT ?" - ) - - def get_recent_events_for_room_txn(txn): - if from_token is None: - txn.execute(sql, (room_id, end_token.stream, False, limit,)) - else: - txn.execute(sql, ( - room_id, from_token.stream, end_token.stream, False, limit - )) + """Get the most recent events in the room in topological ordering. - rows = self.cursor_to_dict(txn) + Args: + room_id (str) + limit (int) + end_token (str): The stream token representing now. + from_token(str|None): Token to not return events before, if given. - rows.reverse() # As we selected with reverse ordering + Returns: + Deferred[tuple[list[dict], tuple[str, str]]]: Returns a list of + dicts (which include event_ids, etc), and a tuple for + `(start_token, end_token)` representing the range of rows + returned. + The returned events are in ascending order. + """ + # Allow a zero limit here, and no-op. + if limit == 0: + defer.returnValue(([], (end_token, end_token))) - if rows: - # Tokens are positions between events. - # This token points *after* the last event in the chunk. - # We need it to point to the event before it in the chunk - # since we are going backwards so we subtract one from the - # stream part. - topo = rows[0]["topological_ordering"] - toke = rows[0]["stream_ordering"] - 1 - start_token = str(RoomStreamToken(topo, toke)) + end_token = RoomStreamToken.parse_stream_token(end_token) + if from_token is not None: + from_token = RoomStreamToken.parse(from_token) - token = (start_token, str(end_token)) - else: - token = (str(end_token), str(end_token)) + rows, token = yield self.runInteraction( + "get_recent_event_ids_for_room", self._paginate_room_events_txn, + room_id, from_token=end_token, to_token=from_token, limit=limit, + ) - return rows, token + # We want to return the results in ascending order. + rows.reverse() - return self.runInteraction( - "get_recent_events_for_room", get_recent_events_for_room_txn - ) + defer.returnValue((rows, (token, str(end_token)))) def get_room_event_after_stream_ordering(self, room_id, stream_ordering): """Gets details of the first event in a room at or after a stream ordering -- cgit 1.4.1 From 7dd13415dbf82edeaae0b94f835e20025964e1b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 10:58:16 +0100 Subject: Remove unused from_token param --- synapse/storage/stream.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index da43bb1321..ecd39074b8 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -346,9 +346,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): defer.returnValue(ret) @defer.inlineCallbacks - def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): + def get_recent_events_for_room(self, room_id, limit, end_token): rows, token = yield self.get_recent_event_ids_for_room( - room_id, limit, end_token, from_token + room_id, limit, end_token, ) logger.debug("stream before") @@ -363,14 +363,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): defer.returnValue((events, token)) @defer.inlineCallbacks - def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None): + def get_recent_event_ids_for_room(self, room_id, limit, end_token): """Get the most recent events in the room in topological ordering. Args: room_id (str) limit (int) end_token (str): The stream token representing now. - from_token(str|None): Token to not return events before, if given. Returns: Deferred[tuple[list[dict], tuple[str, str]]]: Returns a list of @@ -384,12 +383,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): defer.returnValue(([], (end_token, end_token))) end_token = RoomStreamToken.parse_stream_token(end_token) - if from_token is not None: - from_token = RoomStreamToken.parse(from_token) rows, token = yield self.runInteraction( "get_recent_event_ids_for_room", self._paginate_room_events_txn, - room_id, from_token=end_token, to_token=from_token, limit=limit, + room_id, from_token=end_token, limit=limit, ) # We want to return the results in ascending order. -- cgit 1.4.1 From 05e0a2462c76be6987c7ec3d9517d500583bac65 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 11:18:23 +0100 Subject: Refactor pagination DB API to return concrete type This makes it easier to document what is being returned by the storage functions and what some functions expect as arguments. --- synapse/storage/stream.py | 76 ++++++++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 28 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index ecd39074b8..772d2c6198 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -47,6 +47,7 @@ import abc import logging from six.moves import range +from collections import namedtuple logger = logging.getLogger(__name__) @@ -59,6 +60,12 @@ _STREAM_TOKEN = "stream" _TOPOLOGICAL_TOKEN = "topological" +# Used as return values for pagination APIs +_EventDictReturn = namedtuple("_EventDictReturn", ( + "event_id", "topological_ordering", "stream_ordering", +)) + + def lower_bound(token, engine, inclusive=False): inclusive = "=" if inclusive else "" if token.topological is None: @@ -256,9 +263,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): " ORDER BY stream_ordering %s LIMIT ?" ) % (order,) txn.execute(sql, (room_id, from_id, to_id, limit)) + + rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] else: sql = ( - "SELECT event_id, stream_ordering FROM events WHERE" + "SELECT event_id, topological_ordering, stream_ordering" + " FROM events" + " WHERE" " room_id = ?" " AND not outlier" " AND stream_ordering <= ?" @@ -266,14 +277,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) % (order, order,) txn.execute(sql, (room_id, to_id, limit)) - rows = self.cursor_to_dict(txn) + rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn] return rows rows = yield self.runInteraction("get_room_events_stream_for_room", f) ret = yield self._get_events( - [r["event_id"] for r in rows], + [r.event_id for r in rows], get_prev_content=True ) @@ -283,7 +294,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ret.reverse() if rows: - key = "s%d" % min(r["stream_ordering"] for r in rows) + key = "s%d" % min(r.stream_ordering for r in rows) else: # Assume we didn't get anything because there was nothing to # get. @@ -330,14 +341,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): " ORDER BY stream_ordering ASC" ) txn.execute(sql, (user_id, to_id,)) - rows = self.cursor_to_dict(txn) + + rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] return rows rows = yield self.runInteraction("get_membership_changes_for_user", f) ret = yield self._get_events( - [r["event_id"] for r in rows], + [r.event_id for r in rows], get_prev_content=True ) @@ -353,14 +365,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): logger.debug("stream before") events = yield self._get_events( - [r["event_id"] for r in rows], + [r.event_id for r in rows], get_prev_content=True ) logger.debug("stream after") self._set_before_and_after(events, rows) - defer.returnValue((events, token)) + defer.returnValue((events, (token, end_token))) @defer.inlineCallbacks def get_recent_event_ids_for_room(self, room_id, limit, end_token): @@ -372,15 +384,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): end_token (str): The stream token representing now. Returns: - Deferred[tuple[list[dict], tuple[str, str]]]: Returns a list of - dicts (which include event_ids, etc), and a tuple for - `(start_token, end_token)` representing the range of rows - returned. - The returned events are in ascending order. + Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of + _EventDictReturn and a token pointint to the start of the returned + events. + The events returned are in ascending order. """ # Allow a zero limit here, and no-op. if limit == 0: - defer.returnValue(([], (end_token, end_token))) + defer.returnValue(([], end_token)) end_token = RoomStreamToken.parse_stream_token(end_token) @@ -392,7 +403,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # We want to return the results in ascending order. rows.reverse() - defer.returnValue((rows, (token, str(end_token)))) + defer.returnValue((rows, token)) def get_room_event_after_stream_ordering(self, room_id, stream_ordering): """Gets details of the first event in a room at or after a stream ordering @@ -496,10 +507,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @staticmethod def _set_before_and_after(events, rows, topo_order=True): + """Inserts ordering information to events' internal metadata from + the DB rows. + + Args: + events (list[FrozenEvent]) + rows (list[_EventDictReturn]) + topo_order (bool): Whether the events were ordered topologically + or by stream ordering + """ for event, row in zip(events, rows): - stream = row["stream_ordering"] - if topo_order: - topo = event.depth + stream = row.stream_ordering + if topo_order and row.topological_ordering: + topo = row.topological_ordering else: topo = None internal = event.internal_metadata @@ -586,12 +606,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows, start_token = self._paginate_room_events_txn( txn, room_id, before_token, direction='b', limit=before_limit, ) - events_before = [r["event_id"] for r in rows] + events_before = [r.event_id for r in rows] rows, end_token = self._paginate_room_events_txn( txn, room_id, after_token, direction='f', limit=after_limit, ) - events_after = [r["event_id"] for r in rows] + events_after = [r.event_id for r in rows] return { "before": { @@ -672,9 +692,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): those that match the filter. Returns: - tuple[list[dict], str]: Returns the results as a list of dicts and - a token that points to the end of the result set. The dicts have - the keys "event_id", "toplogical_ordering" and "stream_ordering". + tuple[list[_EventDictReturn], str]: Returns the results as a list + of _EventDictReturn and a token that points to the end of the + result set. """ # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence @@ -725,11 +745,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn.execute(sql, args) - rows = self.cursor_to_dict(txn) + rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn] if rows: - topo = rows[-1]["topological_ordering"] - toke = rows[-1]["stream_ordering"] + topo = rows[-1].topological_ordering + toke = rows[-1].stream_ordering if direction == 'b': # Tokens are positions between events. # This token points *after* the last event in the chunk. @@ -764,7 +784,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Returns: tuple[list[dict], str]: Returns the results as a list of dicts and a token that points to the end of the result set. The dicts have - the keys "event_id", "toplogical_ordering" and "stream_orderign". + the keys "event_id", "topological_ordering" and "stream_orderign". """ from_key = RoomStreamToken.parse(from_key) @@ -777,7 +797,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) events = yield self._get_events( - [r["event_id"] for r in rows], + [r.event_id for r in rows], get_prev_content=True ) -- cgit 1.4.1 From c4af4c24ca988832018feaf0ac5a2f6dbb8bfe68 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 11:55:34 +0100 Subject: Refactor get_recent_events_for_room return type There is no reason to return a tuple of tokens when the last token is always the token passed as an argument. Changing it makes it consistent with other storage APIs --- synapse/handlers/initial_sync.py | 10 +++++----- synapse/handlers/sync.py | 2 +- synapse/storage/stream.py | 16 +++++++++++++++- 3 files changed, 21 insertions(+), 7 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index cd33a86599..5a9aa0c16d 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -181,8 +181,8 @@ class InitialSyncHandler(BaseHandler): self.store, user_id, messages ) - start_token = now_token.copy_and_replace("room_key", token[0]) - end_token = now_token.copy_and_replace("room_key", token[1]) + start_token = now_token.copy_and_replace("room_key", token) + end_token = now_token.copy_and_replace("room_key", room_end_token) time_now = self.clock.time_msec() d["messages"] = { @@ -325,8 +325,8 @@ class InitialSyncHandler(BaseHandler): self.store, user_id, messages, is_peeking=is_peeking ) - start_token = StreamToken.START.copy_and_replace("room_key", token[0]) - end_token = StreamToken.START.copy_and_replace("room_key", token[1]) + start_token = StreamToken.START.copy_and_replace("room_key", token) + end_token = StreamToken.START.copy_and_replace("room_key", stream_token) time_now = self.clock.time_msec() @@ -409,7 +409,7 @@ class InitialSyncHandler(BaseHandler): ) start_token = now_token.copy_and_replace("room_key", token[0]) - end_token = now_token.copy_and_replace("room_key", token[1]) + end_token = now_token time_now = self.clock.time_msec() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b52e4c2aff..c25a76d215 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -429,7 +429,7 @@ class SyncHandler(object): Returns: A Deferred map from ((type, state_key)->Event) """ - last_events, token = yield self.store.get_recent_events_for_room( + last_events, _ = yield self.store.get_recent_events_for_room( room_id, end_token=stream_position.room_key, limit=1, ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 772d2c6198..b5baacd32c 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -359,6 +359,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token): + """Get the most recent events in the room in topological ordering. + + Args: + room_id (str) + limit (int) + end_token (str): The stream token representing now. + + Returns: + Deferred[tuple[list[FrozenEvent], str]]: Returns a list of + events and a token pointint to the start of the returned + events. + The events returned are in ascending order. + """ + rows, token = yield self.get_recent_event_ids_for_room( room_id, limit, end_token, ) @@ -372,7 +386,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): self._set_before_and_after(events, rows) - defer.returnValue((events, (token, end_token))) + defer.returnValue((events, token)) @defer.inlineCallbacks def get_recent_event_ids_for_room(self, room_id, limit, end_token): -- cgit 1.4.1 From e5ab9cd24b2f63c6ca00ae6354dbcbfcd9127dfb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 11:58:35 +0100 Subject: Don't unnecessarily require token to be stream token This allows calling the `get_recent_event_ids_for_room` function in more situations. --- synapse/storage/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b5baacd32c..5e4327bb96 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -407,7 +407,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if limit == 0: defer.returnValue(([], end_token)) - end_token = RoomStreamToken.parse_stream_token(end_token) + end_token = RoomStreamToken.parse(end_token) rows, token = yield self.runInteraction( "get_recent_event_ids_for_room", self._paginate_room_events_txn, -- cgit 1.4.1 From e2accd7f1d21e34181dd4543eca30ad1ea971b4c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 11:59:45 +0100 Subject: Refactor sync APIs to reuse pagination API The sync API often returns events in a topological rather than stream ordering, e.g. when the user joined the room or on initial sync. When this happens we can reuse existing pagination storage functions. --- synapse/handlers/sync.py | 19 ++++++++---- synapse/storage/stream.py | 73 +++++++++++++++++++++++------------------------ 2 files changed, 48 insertions(+), 44 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c25a76d215..b75daa340d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -354,12 +354,19 @@ class SyncHandler(object): since_key = since_token.room_key while limited and len(recents) < timeline_limit and max_repeat: - events, end_key = yield self.store.get_room_events_stream_for_room( - room_id, - limit=load_limit + 1, - from_key=since_key, - to_key=end_key, - ) + if since_key: + events, end_key = yield self.store.get_room_events_stream_for_room( + room_id, + limit=load_limit + 1, + from_key=since_key, + to_key=end_key, + ) + else: + events, end_key = yield self.store.get_recent_events_for_room( + room_id, + limit=load_limit + 1, + end_token=end_key, + ) loaded_recents = sync_config.filter_collection.filter_room_timeline( events ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 5e4327bb96..8bb4e85709 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -233,52 +233,49 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0, order='DESC'): - # Note: If from_key is None then we return in topological order. This - # is because in that case we're using this as a "get the last few messages - # in a room" function, rather than "get new messages since last sync" - if from_key is not None: - from_id = RoomStreamToken.parse_stream_token(from_key).stream - else: - from_id = None - to_id = RoomStreamToken.parse_stream_token(to_key).stream + """Get new room events in stream ordering since `from_key`. + + Args: + room_id (str) + from_key (str): Token from which no events are returned before + to_key (str): Token from which no events are returned after. (This + is typically the current stream token) + limit (int): Maximum number of events to return + order (str): Either "DESC" or "ASC". Determines which events are + returned when the result is limited. If "DESC" then the most + recent `limit` events are returned, otherwise returns the + oldest `limit` events. + + Returns: + Deferred[tuple[list[FrozenEvent], str]]: Returns the list of + events (in ascending order) and the token from the start of + the chunk of events returned. + """ if from_key == to_key: defer.returnValue(([], from_key)) - if from_id: - has_changed = yield self._events_stream_cache.has_entity_changed( - room_id, from_id - ) + from_id = RoomStreamToken.parse_stream_token(from_key).stream + to_id = RoomStreamToken.parse_stream_token(to_key).stream - if not has_changed: - defer.returnValue(([], from_key)) + has_changed = yield self._events_stream_cache.has_entity_changed( + room_id, from_id + ) - def f(txn): - if from_id is not None: - sql = ( - "SELECT event_id, stream_ordering FROM events WHERE" - " room_id = ?" - " AND not outlier" - " AND stream_ordering > ? AND stream_ordering <= ?" - " ORDER BY stream_ordering %s LIMIT ?" - ) % (order,) - txn.execute(sql, (room_id, from_id, to_id, limit)) - - rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] - else: - sql = ( - "SELECT event_id, topological_ordering, stream_ordering" - " FROM events" - " WHERE" - " room_id = ?" - " AND not outlier" - " AND stream_ordering <= ?" - " ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?" - ) % (order, order,) - txn.execute(sql, (room_id, to_id, limit)) + if not has_changed: + defer.returnValue(([], from_key)) - rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn] + def f(txn): + sql = ( + "SELECT event_id, stream_ordering FROM events WHERE" + " room_id = ?" + " AND not outlier" + " AND stream_ordering > ? AND stream_ordering <= ?" + " ORDER BY stream_ordering %s LIMIT ?" + ) % (order,) + txn.execute(sql, (room_id, from_id, to_id, limit)) + rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] return rows rows = yield self.runInteraction("get_room_events_stream_for_room", f) -- cgit 1.4.1 From c0e08dc45b8996e88ff3fd6c7584c26f5ec2e839 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 14:31:32 +0100 Subject: Remove unused code path from member change DB func The function is never called without a from_key, so we can remove all the handling for that scenario. --- synapse/storage/stream.py | 34 ++++++++++------------------------ 1 file changed, 10 insertions(+), 24 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 772d2c6198..25aef32551 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -304,10 +304,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_membership_changes_for_user(self, user_id, from_key, to_key): - if from_key is not None: - from_id = RoomStreamToken.parse_stream_token(from_key).stream - else: - from_id = None + from_id = RoomStreamToken.parse_stream_token(from_key).stream to_id = RoomStreamToken.parse_stream_token(to_key).stream if from_key == to_key: @@ -321,26 +318,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): defer.returnValue([]) def f(txn): - if from_id is not None: - sql = ( - "SELECT m.event_id, stream_ordering FROM events AS e," - " room_memberships AS m" - " WHERE e.event_id = m.event_id" - " AND m.user_id = ?" - " AND e.stream_ordering > ? AND e.stream_ordering <= ?" - " ORDER BY e.stream_ordering ASC" - ) - txn.execute(sql, (user_id, from_id, to_id,)) - else: - sql = ( - "SELECT m.event_id, stream_ordering FROM events AS e," - " room_memberships AS m" - " WHERE e.event_id = m.event_id" - " AND m.user_id = ?" - " AND stream_ordering <= ?" - " ORDER BY stream_ordering ASC" - ) - txn.execute(sql, (user_id, to_id,)) + sql = ( + "SELECT m.event_id, stream_ordering FROM events AS e," + " room_memberships AS m" + " WHERE e.event_id = m.event_id" + " AND m.user_id = ?" + " AND e.stream_ordering > ? AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + ) + txn.execute(sql, (user_id, from_id, to_id,)) rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] -- cgit 1.4.1 From 75552d2148f9f48add495cb7b18a378b0abf70b9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 15:15:38 +0100 Subject: Update comments --- synapse/storage/stream.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 772d2c6198..60d2dca154 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -385,7 +385,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Returns: Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of - _EventDictReturn and a token pointint to the start of the returned + _EventDictReturn and a token pointing to the start of the returned events. The events returned are in ascending order. """ @@ -514,7 +514,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): events (list[FrozenEvent]) rows (list[_EventDictReturn]) topo_order (bool): Whether the events were ordered topologically - or by stream ordering + or by stream ordering. If true then all rows should have a non + null topological_ordering. """ for event, row in zip(events, rows): stream = row.stream_ordering @@ -692,9 +693,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): those that match the filter. Returns: - tuple[list[_EventDictReturn], str]: Returns the results as a list - of _EventDictReturn and a token that points to the end of the - result set. + Deferred[tuple[list[_EventDictReturn], str]]: Returns the results + as a list of _EventDictReturn and a token that points to the end + of the result set. """ # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence -- cgit 1.4.1 From 7ce98804ff2ffa8ecacf63ddad469dc0221b655e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 15:42:39 +0100 Subject: Fix up comment --- synapse/storage/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index f529642101..b6d171eb9b 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -368,7 +368,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Returns: Deferred[tuple[list[FrozenEvent], str]]: Returns a list of - events and a token pointint to the start of the returned + events and a token pointing to the start of the returned events. The events returned are in ascending order. """ -- cgit 1.4.1 From 5494c1d71e47245b9750fc260462fe4c2ca3e273 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 May 2018 18:15:21 +0100 Subject: Don't support limitless pagination The pagination storage function supported not specifiying a limit on the number of events returned. This was triggered when using the search or context API with a limit of zero, which the storage function took to mean not being limited. --- synapse/storage/stream.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index ea24710ad8..1701d03866 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -684,8 +684,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): results to only those before direction(char): Either 'b' or 'f' to indicate whether we are paginating forwards or backwards from `from_key`. - limit (int): The maximum number of events to return. Zero or less - means no limit. + limit (int): The maximum number of events to return. event_filter (Filter|None): If provided filters the events to those that match the filter. @@ -694,6 +693,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): as a list of _EventDictReturn and a token that points to the end of the result set. """ + + assert int(limit) >= 0 + # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. @@ -723,18 +725,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): bounds += " AND " + filter_clause args.extend(filter_args) - if int(limit) > 0: - args.append(int(limit)) - limit_str = " LIMIT ?" - else: - limit_str = "" + args.append(int(limit)) sql = ( "SELECT event_id, topological_ordering, stream_ordering" " FROM events" " WHERE outlier = ? AND room_id = ? AND %(bounds)s" " ORDER BY topological_ordering %(order)s," - " stream_ordering %(order)s %(limit)s" + " stream_ordering %(order)s LIMIT ?" ) % { "bounds": bounds, "order": order, -- cgit 1.4.1 From a17e901f4dd1825d4bf21e55488939d95e1d7d80 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 May 2018 18:24:32 +0100 Subject: Remove unused string formatting param --- synapse/storage/stream.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 1701d03866..fb463c525a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -736,7 +736,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) % { "bounds": bounds, "order": order, - "limit": limit_str } txn.execute(sql, args) -- cgit 1.4.1