From f7199e873448794ffc88a08a9d7c01f1be0dfca1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Oct 2018 11:23:08 +0100 Subject: Log looping call exceptions If a looping call function errors, then it kills the loop entirely. Currently it throws away the exception logs, so we should make it actually log them. Fixes #3929 --- synapse/util/__init__.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 680ea928c7..c237d003bc 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import functools import logging from itertools import islice @@ -66,7 +67,7 @@ class Clock(object): f(function): The function to call repeatedly. msec(float): How long to wait between calls in milliseconds. """ - call = task.LoopingCall(f) + call = task.LoopingCall(_log_exception_wrapper(f)) call.clock = self._reactor call.start(msec / 1000.0, now=False) return call @@ -109,3 +110,19 @@ def batch_iter(iterable, size): sourceiter = iter(iterable) # call islice until it returns an empty tuple return iter(lambda: tuple(islice(sourceiter, size)), ()) + + +def _log_exception_wrapper(f): + """Used to wrap looping calls to log loudly if they get killed + """ + + @functools.wraps(f) + def wrap(*args, **kwargs): + try: + logger.info("Running looping call") + return f(*args, **kwargs) + except: # noqa: E722, as we reraise the exception this is fine. + logger.exception("Looping called died") + raise + + return wrap -- cgit 1.5.1 From 8164f6daf362b7039d6dcc8b645f306cfd5c49a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Oct 2018 11:25:09 +0100 Subject: Newsfile --- changelog.d/4008.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/4008.misc diff --git a/changelog.d/4008.misc b/changelog.d/4008.misc new file mode 100644 index 0000000000..5730210054 --- /dev/null +++ b/changelog.d/4008.misc @@ -0,0 +1 @@ +Log exceptions in looping calls -- cgit 1.5.1 From 8a1817f0d29308a233783d43cbf1ad27891120c1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Oct 2018 13:26:54 +0100 Subject: Use errback pattern and catch async failures --- synapse/handlers/appservice.py | 7 ++++++- synapse/util/__init__.py | 43 ++++++++++++++++++++++++++++-------------- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index f0f89af7dc..16b897eb18 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -28,6 +28,7 @@ from synapse.metrics import ( event_processing_loop_room_count, ) from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util import make_log_failure_errback from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.metrics import Measure @@ -112,7 +113,11 @@ class ApplicationServicesHandler(object): if not self.started_scheduler: def start_scheduler(): - return self.scheduler.start().addErrback(log_failure) + return self.scheduler.start().addErrback( + make_log_failure_errback( + "Application Services Failure", + ) + ) run_as_background_process("as_scheduler", start_scheduler) self.started_scheduler = True diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index c237d003bc..964078aed4 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import functools import logging from itertools import islice @@ -67,9 +66,12 @@ class Clock(object): f(function): The function to call repeatedly. msec(float): How long to wait between calls in milliseconds. """ - call = task.LoopingCall(_log_exception_wrapper(f)) + call = task.LoopingCall(f) call.clock = self._reactor - call.start(msec / 1000.0, now=False) + d = call.start(msec / 1000.0, now=False) + d.addErrback(make_log_failure_errback( + "Looping call died", consumeErrors=False, + )) return call def call_later(self, delay, callback, *args, **kwargs): @@ -112,17 +114,30 @@ def batch_iter(iterable, size): return iter(lambda: tuple(islice(sourceiter, size)), ()) -def _log_exception_wrapper(f): - """Used to wrap looping calls to log loudly if they get killed +def make_log_failure_errback(msg, consumeErrors=True): + """Creates a function suitable for passing to `Deferred.addErrback` that + logs any failures that occur. + + Args: + msg (str): Message to log + consumeErrors (bool): If true consumes the failure, otherwise passes + on down the callback chain + + Returns: + func(Failure) """ - @functools.wraps(f) - def wrap(*args, **kwargs): - try: - logger.info("Running looping call") - return f(*args, **kwargs) - except: # noqa: E722, as we reraise the exception this is fine. - logger.exception("Looping called died") - raise + def log_failure(failure): + logger.error( + msg, + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject() + ) + ) + + if not consumeErrors: + return failure - return wrap + return log_failure -- cgit 1.5.1 From 69823205722662a72e4203ec304ff595a0f6ecf7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Oct 2018 14:06:19 +0100 Subject: Remove unnecessary extra function call layer --- synapse/handlers/appservice.py | 18 +++--------------- synapse/util/__init__.py | 29 +++++++++++++---------------- 2 files changed, 16 insertions(+), 31 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 16b897eb18..17eedf4dbf 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -28,7 +28,7 @@ from synapse.metrics import ( event_processing_loop_room_count, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util import make_log_failure_errback +from synapse.util import log_failure from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.metrics import Measure @@ -37,17 +37,6 @@ logger = logging.getLogger(__name__) events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "") -def log_failure(failure): - logger.error( - "Application Services Failure", - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject() - ) - ) - - class ApplicationServicesHandler(object): def __init__(self, hs): @@ -114,10 +103,9 @@ class ApplicationServicesHandler(object): if not self.started_scheduler: def start_scheduler(): return self.scheduler.start().addErrback( - make_log_failure_errback( - "Application Services Failure", - ) + log_failure, "Application Services Failure", ) + run_as_background_process("as_scheduler", start_scheduler) self.started_scheduler = True diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 964078aed4..9a8fae0497 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -69,9 +69,9 @@ class Clock(object): call = task.LoopingCall(f) call.clock = self._reactor d = call.start(msec / 1000.0, now=False) - d.addErrback(make_log_failure_errback( - "Looping call died", consumeErrors=False, - )) + d.addErrback( + log_failure, "Looping call died", consumeErrors=False, + ) return call def call_later(self, delay, callback, *args, **kwargs): @@ -114,7 +114,7 @@ def batch_iter(iterable, size): return iter(lambda: tuple(islice(sourceiter, size)), ()) -def make_log_failure_errback(msg, consumeErrors=True): +def log_failure(failure, msg, consumeErrors=True): """Creates a function suitable for passing to `Deferred.addErrback` that logs any failures that occur. @@ -127,17 +127,14 @@ def make_log_failure_errback(msg, consumeErrors=True): func(Failure) """ - def log_failure(failure): - logger.error( - msg, - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject() - ) + logger.error( + msg, + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject() ) + ) - if not consumeErrors: - return failure - - return log_failure + if not consumeErrors: + return failure -- cgit 1.5.1 From bdc27d6716d14128e25737865cc36c8adf42aeaa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Oct 2018 14:15:49 +0100 Subject: Add metric to count lazy member sync requests --- synapse/handlers/sync.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 67b8ca28c7..58edf21472 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -20,6 +20,8 @@ import logging from six import iteritems, itervalues +from prometheus_client import Counter + from twisted.internet import defer from synapse.api.constants import EventTypes, Membership @@ -36,6 +38,13 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) + +# Counts the number of times we got asked for a lazy loaded sync. Type is one of +# initial_sync, full_sate_sync or incremental_sync +lazy_member_sync_counter = Counter( + "synapse_handlers_sync_lazy_member_sync", "", ["type"], +) + # Store the cache that tracks which lazy-loaded members have been sent to a given # client for no more than 30 minutes. LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000 @@ -227,14 +236,19 @@ class SyncHandler(object): @defer.inlineCallbacks def _wait_for_sync_for_user(self, sync_config, since_token, timeout, full_state): + if since_token is None: + sync_type = "initial_sync" + elif full_state: + sync_type = "full_state_sync" + else: + sync_type = "incremental_sync" + context = LoggingContext.current_context() if context: - if since_token is None: - context.tag = "initial_sync" - elif full_state: - context.tag = "full_state_sync" - else: - context.tag = "incremental_sync" + context.tag = sync_type + + if sync_config.filter_collection.lazy_load_members(): + lazy_member_sync_counter.labels(sync_type).inc() if timeout == 0 or since_token is None or full_state: # we are going to return immediately, so don't bother calling -- cgit 1.5.1 From 20733857abb01aee20ec9fd30dbd4b006aa5de23 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Oct 2018 14:16:48 +0100 Subject: Newsfile --- changelog.d/4022.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/4022.misc diff --git a/changelog.d/4022.misc b/changelog.d/4022.misc new file mode 100644 index 0000000000..f1d49932e1 --- /dev/null +++ b/changelog.d/4022.misc @@ -0,0 +1 @@ +Add metric to count lazy member sync requests -- cgit 1.5.1 From 395276b4051123d369aab5deab020f816479c63b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Oct 2018 09:24:39 +0100 Subject: Append _total to metric and fix up spelling --- synapse/handlers/sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 58edf21472..41daa1d888 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -40,9 +40,9 @@ logger = logging.getLogger(__name__) # Counts the number of times we got asked for a lazy loaded sync. Type is one of -# initial_sync, full_sate_sync or incremental_sync +# initial_sync, full_state_sync or incremental_sync lazy_member_sync_counter = Counter( - "synapse_handlers_sync_lazy_member_sync", "", ["type"], + "synapse_handlers_sync_lazy_member_sync_total", "", ["type"], ) # Store the cache that tracks which lazy-loaded members have been sent to a given -- cgit 1.5.1 From 3cbe8331e60559008875679fab51423eedb242c1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Oct 2018 11:23:17 +0100 Subject: Track number of non-empty sync responses instead --- synapse/handlers/sync.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 41daa1d888..5cae48436f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -39,10 +39,12 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) -# Counts the number of times we got asked for a lazy loaded sync. Type is one of -# initial_sync, full_state_sync or incremental_sync -lazy_member_sync_counter = Counter( - "synapse_handlers_sync_lazy_member_sync_total", "", ["type"], +# Counts the number of times we returned a non-empty sync. `type` is one of +# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is +# "true" or "false" depending on if the request asked for lazy loaded members or +# not. +non_empty_sync_counter = Counter( + "synapse_handlers_sync_nonempty_total", "", ["type", "lazy_loaded"], ) # Store the cache that tracks which lazy-loaded members have been sent to a given @@ -247,16 +249,12 @@ class SyncHandler(object): if context: context.tag = sync_type - if sync_config.filter_collection.lazy_load_members(): - lazy_member_sync_counter.labels(sync_type).inc() - if timeout == 0 or since_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. result = yield self.current_sync_for_user( sync_config, since_token, full_state=full_state, ) - defer.returnValue(result) else: def current_sync_callback(before_token, after_token): return self.current_sync_for_user(sync_config, since_token) @@ -265,7 +263,15 @@ class SyncHandler(object): sync_config.user.to_string(), timeout, current_sync_callback, from_token=since_token, ) - defer.returnValue(result) + + if result: + if sync_config.filter_collection.lazy_load_members(): + lazy_loaded = "true" + else: + lazy_loaded = "false" + non_empty_sync_counter.labels(sync_type, lazy_loaded).inc() + + defer.returnValue(result) def current_sync_for_user(self, sync_config, since_token=None, full_state=False): -- cgit 1.5.1 From 49840f5ab23f7c3bdaee72e2bdad2d458f4446d9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Oct 2018 11:31:02 +0100 Subject: Update newsfile --- changelog.d/4022.misc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/4022.misc b/changelog.d/4022.misc index f1d49932e1..5b0e136795 100644 --- a/changelog.d/4022.misc +++ b/changelog.d/4022.misc @@ -1 +1 @@ -Add metric to count lazy member sync requests +Add metric to count number of non-empty sync responses -- cgit 1.5.1 From 7e561b5c1a0cf0177f14e67851bb7e0ffaeda042 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Oct 2018 11:40:43 +0100 Subject: Add description to counter metric --- synapse/handlers/sync.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5cae48436f..351892a94f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -44,7 +44,11 @@ logger = logging.getLogger(__name__) # "true" or "false" depending on if the request asked for lazy loaded members or # not. non_empty_sync_counter = Counter( - "synapse_handlers_sync_nonempty_total", "", ["type", "lazy_loaded"], + "synapse_handlers_sync_nonempty_total", + "Count of non empty sync responses. type is initial_sync/full_state_sync" + "/incremental_sync. lazy_loaded indicates if lazy loaded members were " + "enabled for that request.", + ["type", "lazy_loaded"], ) # Store the cache that tracks which lazy-loaded members have been sent to a given -- cgit 1.5.1 From 8ddd0f273c501b83b75a58e379d73aef3cfab35e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 12 Oct 2018 09:55:41 +0100 Subject: Comments on get_all_new_events_stream just some docstrings to clarify the behaviour here --- synapse/storage/stream.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 4c296d72c0..d6cfdba519 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -630,7 +630,21 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_all_new_events_stream(self, from_id, current_id, limit): - """Get all new events""" + """Get all new events + + Returns all events with from_id < stream_ordering <= current_id. + + Args: + from_id (int): the stream_ordering of the last event we processed + current_id (int): the stream_ordering of the most recently processed event + limit (int): the maximum number of events to return + + Returns: + Deferred[Tuple[int, list[FrozenEvent]]]: A tuple of (next_id, events), where + `next_id` is the next value to pass as `from_id` (it will either be the + stream_ordering of the last returned event, or, if fewer than `limit` events + were found, `current_id`. + """ def get_all_new_events_stream_txn(txn): sql = ( -- cgit 1.5.1 From 381d2cfdf0f02935b743f4b6dc1b5133d7ed27b7 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Sat, 13 Oct 2018 00:14:08 +1100 Subject: Make workers work on Py3 (#4027) --- changelog.d/4027.bugfix | 1 + synapse/app/_base.py | 30 ++++++++++++------------ synapse/app/event_creator.py | 3 +++ synapse/app/pusher.py | 15 ++++++------ synapse/app/synchrotron.py | 12 +++++----- synapse/python_dependencies.py | 3 --- synapse/replication/slave/storage/_base.py | 9 +++++++ synapse/replication/slave/storage/deviceinbox.py | 12 +++++----- synapse/replication/slave/storage/devices.py | 11 +-------- synapse/replication/slave/storage/groups.py | 8 +++---- synapse/replication/slave/storage/keys.py | 14 +++++------ synapse/replication/slave/storage/presence.py | 6 ++--- synctl | 2 +- 13 files changed, 64 insertions(+), 62 deletions(-) create mode 100644 changelog.d/4027.bugfix diff --git a/changelog.d/4027.bugfix b/changelog.d/4027.bugfix new file mode 100644 index 0000000000..c9bbff3f68 --- /dev/null +++ b/changelog.d/4027.bugfix @@ -0,0 +1 @@ +Workers now start on Python 3. diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 7c866e246a..18584226e9 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -17,6 +17,7 @@ import gc import logging import sys +import psutil from daemonize import Daemonize from twisted.internet import error, reactor @@ -24,12 +25,6 @@ from twisted.internet import error, reactor from synapse.util import PreserveLoggingContext from synapse.util.rlimit import change_resource_limit -try: - import affinity -except Exception: - affinity = None - - logger = logging.getLogger(__name__) @@ -89,15 +84,20 @@ def start_reactor( with PreserveLoggingContext(): logger.info("Running") if cpu_affinity is not None: - if not affinity: - quit_with_error( - "Missing package 'affinity' required for cpu_affinity\n" - "option\n\n" - "Install by running:\n\n" - " pip install affinity\n\n" - ) - logger.info("Setting CPU affinity to %s" % cpu_affinity) - affinity.set_process_affinity_mask(0, cpu_affinity) + # Turn the bitmask into bits, reverse it so we go from 0 up + mask_to_bits = bin(cpu_affinity)[2:][::-1] + + cpus = [] + cpu_num = 0 + + for i in mask_to_bits: + if i == "1": + cpus.append(cpu_num) + cpu_num += 1 + + p = psutil.Process() + p.cpu_affinity(cpus) + change_resource_limit(soft_file_limit) if gc_thresholds: gc.set_threshold(*gc_thresholds) diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index 9060ab14f6..e4a68715aa 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -178,6 +178,9 @@ def start(config_options): setup_logging(config, use_worker_options=True) + # This should only be done on the user directory worker or the master + config.update_user_directory = False + events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 630dcda478..0f9f8e19f6 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.replication.slave.storage._base import __func__ from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore @@ -49,31 +50,31 @@ class PusherSlaveStore( SlavedAccountDataStore ): update_pusher_last_stream_ordering_and_success = ( - DataStore.update_pusher_last_stream_ordering_and_success.__func__ + __func__(DataStore.update_pusher_last_stream_ordering_and_success) ) update_pusher_failing_since = ( - DataStore.update_pusher_failing_since.__func__ + __func__(DataStore.update_pusher_failing_since) ) update_pusher_last_stream_ordering = ( - DataStore.update_pusher_last_stream_ordering.__func__ + __func__(DataStore.update_pusher_last_stream_ordering) ) get_throttle_params_by_room = ( - DataStore.get_throttle_params_by_room.__func__ + __func__(DataStore.get_throttle_params_by_room) ) set_throttle_params = ( - DataStore.set_throttle_params.__func__ + __func__(DataStore.set_throttle_params) ) get_time_of_last_push_action_before = ( - DataStore.get_time_of_last_push_action_before.__func__ + __func__(DataStore.get_time_of_last_push_action_before) ) get_profile_displayname = ( - DataStore.get_profile_displayname.__func__ + __func__(DataStore.get_profile_displayname) ) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 9a7fc6ee9d..3926c7f263 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -33,7 +33,7 @@ from synapse.http.server import JsonResource from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource -from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage._base import BaseSlavedStore, __func__ from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore @@ -147,7 +147,7 @@ class SynchrotronPresence(object): and haven't come back yet. If there are poke the master about them. """ now = self.clock.time_msec() - for user_id, last_sync_ms in self.users_going_offline.items(): + for user_id, last_sync_ms in list(self.users_going_offline.items()): if now - last_sync_ms > 10 * 1000: self.users_going_offline.pop(user_id, None) self.send_user_sync(user_id, False, last_sync_ms) @@ -156,9 +156,9 @@ class SynchrotronPresence(object): # TODO Hows this supposed to work? pass - get_states = PresenceHandler.get_states.__func__ - get_state = PresenceHandler.get_state.__func__ - current_state_for_users = PresenceHandler.current_state_for_users.__func__ + get_states = __func__(PresenceHandler.get_states) + get_state = __func__(PresenceHandler.get_state) + current_state_for_users = __func__(PresenceHandler.current_state_for_users) def user_syncing(self, user_id, affect_presence): if affect_presence: @@ -208,7 +208,7 @@ class SynchrotronPresence(object): ) for row in rows] for state in states: - self.user_to_current_state[row.user_id] = state + self.user_to_current_state[state.user_id] = state stream_id = token yield self.notify_from_replication(states, stream_id) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index d4d983b00a..2947f37f1a 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -82,9 +82,6 @@ CONDITIONAL_REQUIREMENTS = { "psutil": { "psutil>=2.0.0": ["psutil>=2.0.0"], }, - "affinity": { - "affinity": ["affinity"], - }, "postgres": { "psycopg2>=2.6": ["psycopg2"] } diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 3f7be74e02..2d81d49e9a 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -15,6 +15,8 @@ import logging +import six + from synapse.storage._base import SQLBaseStore from synapse.storage.engines import PostgresEngine @@ -23,6 +25,13 @@ from ._slaved_id_tracker import SlavedIdTracker logger = logging.getLogger(__name__) +def __func__(inp): + if six.PY3: + return inp + else: + return inp.__func__ + + class BaseSlavedStore(SQLBaseStore): def __init__(self, db_conn, hs): super(BaseSlavedStore, self).__init__(db_conn, hs) diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 87eaa53004..4f19fd35aa 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -17,7 +17,7 @@ from synapse.storage import DataStore from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.stream_change_cache import StreamChangeCache -from ._base import BaseSlavedStore +from ._base import BaseSlavedStore, __func__ from ._slaved_id_tracker import SlavedIdTracker @@ -43,11 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore): expiry_ms=30 * 60 * 1000, ) - get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__ - get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__ - get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__ - delete_messages_for_device = DataStore.delete_messages_for_device.__func__ - delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__ + get_to_device_stream_token = __func__(DataStore.get_to_device_stream_token) + get_new_messages_for_device = __func__(DataStore.get_new_messages_for_device) + get_new_device_msgs_for_remote = __func__(DataStore.get_new_device_msgs_for_remote) + delete_messages_for_device = __func__(DataStore.delete_messages_for_device) + delete_device_msgs_for_remote = __func__(DataStore.delete_device_msgs_for_remote) def stream_positions(self): result = super(SlavedDeviceInboxStore, self).stream_positions() diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index 21b8c468fa..ec2fd561cc 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -13,23 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import six - from synapse.storage import DataStore from synapse.storage.end_to_end_keys import EndToEndKeyStore from synapse.util.caches.stream_change_cache import StreamChangeCache -from ._base import BaseSlavedStore +from ._base import BaseSlavedStore, __func__ from ._slaved_id_tracker import SlavedIdTracker -def __func__(inp): - if six.PY3: - return inp - else: - return inp.__func__ - - class SlavedDeviceStore(BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedDeviceStore, self).__init__(db_conn, hs) diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py index 5777f07c8d..e933b170bb 100644 --- a/synapse/replication/slave/storage/groups.py +++ b/synapse/replication/slave/storage/groups.py @@ -16,7 +16,7 @@ from synapse.storage import DataStore from synapse.util.caches.stream_change_cache import StreamChangeCache -from ._base import BaseSlavedStore +from ._base import BaseSlavedStore, __func__ from ._slaved_id_tracker import SlavedIdTracker @@ -33,9 +33,9 @@ class SlavedGroupServerStore(BaseSlavedStore): "_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(), ) - get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__ - get_group_stream_token = DataStore.get_group_stream_token.__func__ - get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__ + get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user) + get_group_stream_token = __func__(DataStore.get_group_stream_token) + get_all_groups_for_user = __func__(DataStore.get_all_groups_for_user) def stream_positions(self): result = super(SlavedGroupServerStore, self).stream_positions() diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py index 05ed168463..8032f53fec 100644 --- a/synapse/replication/slave/storage/keys.py +++ b/synapse/replication/slave/storage/keys.py @@ -16,7 +16,7 @@ from synapse.storage import DataStore from synapse.storage.keys import KeyStore -from ._base import BaseSlavedStore +from ._base import BaseSlavedStore, __func__ class SlavedKeyStore(BaseSlavedStore): @@ -24,11 +24,11 @@ class SlavedKeyStore(BaseSlavedStore): "_get_server_verify_key" ] - get_server_verify_keys = DataStore.get_server_verify_keys.__func__ - store_server_verify_key = DataStore.store_server_verify_key.__func__ + get_server_verify_keys = __func__(DataStore.get_server_verify_keys) + store_server_verify_key = __func__(DataStore.store_server_verify_key) - get_server_certificate = DataStore.get_server_certificate.__func__ - store_server_certificate = DataStore.store_server_certificate.__func__ + get_server_certificate = __func__(DataStore.get_server_certificate) + store_server_certificate = __func__(DataStore.store_server_certificate) - get_server_keys_json = DataStore.get_server_keys_json.__func__ - store_server_keys_json = DataStore.store_server_keys_json.__func__ + get_server_keys_json = __func__(DataStore.get_server_keys_json) + store_server_keys_json = __func__(DataStore.store_server_keys_json) diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index 80b744082a..92447b00d4 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -17,7 +17,7 @@ from synapse.storage import DataStore from synapse.storage.presence import PresenceStore from synapse.util.caches.stream_change_cache import StreamChangeCache -from ._base import BaseSlavedStore +from ._base import BaseSlavedStore, __func__ from ._slaved_id_tracker import SlavedIdTracker @@ -34,8 +34,8 @@ class SlavedPresenceStore(BaseSlavedStore): "PresenceStreamChangeCache", self._presence_id_gen.get_current_token() ) - _get_active_presence = DataStore._get_active_presence.__func__ - take_presence_startup_info = DataStore.take_presence_startup_info.__func__ + _get_active_presence = __func__(DataStore._get_active_presence) + take_presence_startup_info = __func__(DataStore.take_presence_startup_info) _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"] get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"] diff --git a/synctl b/synctl index 356e5cb6a7..09b64459b1 100755 --- a/synctl +++ b/synctl @@ -280,7 +280,7 @@ def main(): if worker.cache_factor: os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor) - for cache_name, factor in worker.cache_factors.iteritems(): + for cache_name, factor in iteritems(worker.cache_factors): os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor) start_worker(worker.app, configfile, worker.configfile) -- cgit 1.5.1 From fb216a22dbceebeee59aedfc6f888b73f4492908 Mon Sep 17 00:00:00 2001 From: Ivan Shapovalov Date: Sun, 14 Oct 2018 11:37:04 +0300 Subject: synapse/visibility.py: fix SyntaxError on py3.7 --- changelog.d/4033.bugfix | 1 + synapse/visibility.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/4033.bugfix diff --git a/changelog.d/4033.bugfix b/changelog.d/4033.bugfix new file mode 100644 index 0000000000..8f7a0bbbe9 --- /dev/null +++ b/changelog.d/4033.bugfix @@ -0,0 +1 @@ +Synapse now starts on Python 3.7. diff --git a/synapse/visibility.py b/synapse/visibility.py index c64ad2144c..43f48196be 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -219,7 +219,7 @@ def filter_events_for_server(store, server_name, events): # Whatever else we do, we need to check for senders which have requested # erasure of their data. erased_senders = yield store.are_users_erased( - e.sender for e in events, + (e.sender for e in events), ) def redact_disallowed(event, state): -- cgit 1.5.1 From 06bc8d2fe551b4390cbca9838bc547c5d0021f6d Mon Sep 17 00:00:00 2001 From: Ivan Shapovalov Date: Sun, 14 Oct 2018 11:37:38 +0300 Subject: synapse/app: frontend_proxy.py: actually make workers work on py3 --- changelog.d/4033.bugfix | 1 + synapse/app/frontend_proxy.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/changelog.d/4033.bugfix b/changelog.d/4033.bugfix index 8f7a0bbbe9..4e9e38cdcf 100644 --- a/changelog.d/4033.bugfix +++ b/changelog.d/4033.bugfix @@ -1 +1,2 @@ Synapse now starts on Python 3.7. +_All_ workers now start on Python 3. diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index fc4b25de1c..f5c61dec5b 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -68,7 +68,7 @@ class PresenceStatusStubServlet(ClientV1RestServlet): "Authorization": auth_headers, } result = yield self.http_client.get_json( - self.main_uri + request.uri, + self.main_uri + request.uri.decode('ascii'), headers=headers, ) defer.returnValue((200, result)) @@ -125,7 +125,7 @@ class KeyUploadServlet(RestServlet): "Authorization": auth_headers, } result = yield self.http_client.post_json_get_json( - self.main_uri + request.uri, + self.main_uri + request.uri.decode('ascii'), body, headers=headers, ) -- cgit 1.5.1