summary refs log tree commit diff
diff options
context:
space:
mode:
authorAmber Brown <hawkowl@atleastfornow.net>2018-10-13 00:14:08 +1100
committerGitHub <noreply@github.com>2018-10-13 00:14:08 +1100
commit381d2cfdf0f02935b743f4b6dc1b5133d7ed27b7 (patch)
treeff35333a1bd6658ff25ad6465ccc0ed6f27d4e76
parentComments on get_all_new_events_stream (diff)
downloadsynapse-381d2cfdf0f02935b743f4b6dc1b5133d7ed27b7.tar.xz
Make workers work on Py3 (#4027)
Diffstat (limited to '')
-rw-r--r--changelog.d/4027.bugfix1
-rw-r--r--synapse/app/_base.py30
-rw-r--r--synapse/app/event_creator.py3
-rw-r--r--synapse/app/pusher.py15
-rw-r--r--synapse/app/synchrotron.py12
-rw-r--r--synapse/python_dependencies.py3
-rw-r--r--synapse/replication/slave/storage/_base.py9
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py12
-rw-r--r--synapse/replication/slave/storage/devices.py11
-rw-r--r--synapse/replication/slave/storage/groups.py8
-rw-r--r--synapse/replication/slave/storage/keys.py14
-rw-r--r--synapse/replication/slave/storage/presence.py6
-rwxr-xr-xsynctl2
13 files changed, 64 insertions, 62 deletions
diff --git a/changelog.d/4027.bugfix b/changelog.d/4027.bugfix
new file mode 100644
index 0000000000..c9bbff3f68
--- /dev/null
+++ b/changelog.d/4027.bugfix
@@ -0,0 +1 @@
+Workers now start on Python 3.
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 7c866e246a..18584226e9 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -17,6 +17,7 @@ import gc
 import logging
 import sys
 
+import psutil
 from daemonize import Daemonize
 
 from twisted.internet import error, reactor
@@ -24,12 +25,6 @@ from twisted.internet import error, reactor
 from synapse.util import PreserveLoggingContext
 from synapse.util.rlimit import change_resource_limit
 
-try:
-    import affinity
-except Exception:
-    affinity = None
-
-
 logger = logging.getLogger(__name__)
 
 
@@ -89,15 +84,20 @@ def start_reactor(
         with PreserveLoggingContext():
             logger.info("Running")
             if cpu_affinity is not None:
-                if not affinity:
-                    quit_with_error(
-                        "Missing package 'affinity' required for cpu_affinity\n"
-                        "option\n\n"
-                        "Install by running:\n\n"
-                        "   pip install affinity\n\n"
-                    )
-                logger.info("Setting CPU affinity to %s" % cpu_affinity)
-                affinity.set_process_affinity_mask(0, cpu_affinity)
+                # Turn the bitmask into bits, reverse it so we go from 0 up
+                mask_to_bits = bin(cpu_affinity)[2:][::-1]
+
+                cpus = []
+                cpu_num = 0
+
+                for i in mask_to_bits:
+                    if i == "1":
+                        cpus.append(cpu_num)
+                    cpu_num += 1
+
+                p = psutil.Process()
+                p.cpu_affinity(cpus)
+
             change_resource_limit(soft_file_limit)
             if gc_thresholds:
                 gc.set_threshold(*gc_thresholds)
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 9060ab14f6..e4a68715aa 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -178,6 +178,9 @@ def start(config_options):
 
     setup_logging(config, use_worker_options=True)
 
+    # This should only be done on the user directory worker or the master
+    config.update_user_directory = False
+
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 630dcda478..0f9f8e19f6 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage._base import __func__
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
@@ -49,31 +50,31 @@ class PusherSlaveStore(
     SlavedAccountDataStore
 ):
     update_pusher_last_stream_ordering_and_success = (
-        DataStore.update_pusher_last_stream_ordering_and_success.__func__
+        __func__(DataStore.update_pusher_last_stream_ordering_and_success)
     )
 
     update_pusher_failing_since = (
-        DataStore.update_pusher_failing_since.__func__
+        __func__(DataStore.update_pusher_failing_since)
     )
 
     update_pusher_last_stream_ordering = (
-        DataStore.update_pusher_last_stream_ordering.__func__
+        __func__(DataStore.update_pusher_last_stream_ordering)
     )
 
     get_throttle_params_by_room = (
-        DataStore.get_throttle_params_by_room.__func__
+        __func__(DataStore.get_throttle_params_by_room)
     )
 
     set_throttle_params = (
-        DataStore.set_throttle_params.__func__
+        __func__(DataStore.set_throttle_params)
     )
 
     get_time_of_last_push_action_before = (
-        DataStore.get_time_of_last_push_action_before.__func__
+        __func__(DataStore.get_time_of_last_push_action_before)
     )
 
     get_profile_displayname = (
-        DataStore.get_profile_displayname.__func__
+        __func__(DataStore.get_profile_displayname)
     )
 
 
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 9a7fc6ee9d..3926c7f263 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -33,7 +33,7 @@ from synapse.http.server import JsonResource
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
-from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -147,7 +147,7 @@ class SynchrotronPresence(object):
         and haven't come back yet. If there are poke the master about them.
         """
         now = self.clock.time_msec()
-        for user_id, last_sync_ms in self.users_going_offline.items():
+        for user_id, last_sync_ms in list(self.users_going_offline.items()):
             if now - last_sync_ms > 10 * 1000:
                 self.users_going_offline.pop(user_id, None)
                 self.send_user_sync(user_id, False, last_sync_ms)
@@ -156,9 +156,9 @@ class SynchrotronPresence(object):
         # TODO Hows this supposed to work?
         pass
 
-    get_states = PresenceHandler.get_states.__func__
-    get_state = PresenceHandler.get_state.__func__
-    current_state_for_users = PresenceHandler.current_state_for_users.__func__
+    get_states = __func__(PresenceHandler.get_states)
+    get_state = __func__(PresenceHandler.get_state)
+    current_state_for_users = __func__(PresenceHandler.current_state_for_users)
 
     def user_syncing(self, user_id, affect_presence):
         if affect_presence:
@@ -208,7 +208,7 @@ class SynchrotronPresence(object):
         ) for row in rows]
 
         for state in states:
-            self.user_to_current_state[row.user_id] = state
+            self.user_to_current_state[state.user_id] = state
 
         stream_id = token
         yield self.notify_from_replication(states, stream_id)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index d4d983b00a..2947f37f1a 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -82,9 +82,6 @@ CONDITIONAL_REQUIREMENTS = {
     "psutil": {
         "psutil>=2.0.0": ["psutil>=2.0.0"],
     },
-    "affinity": {
-        "affinity": ["affinity"],
-    },
     "postgres": {
         "psycopg2>=2.6": ["psycopg2"]
     }
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 3f7be74e02..2d81d49e9a 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -15,6 +15,8 @@
 
 import logging
 
+import six
+
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.engines import PostgresEngine
 
@@ -23,6 +25,13 @@ from ._slaved_id_tracker import SlavedIdTracker
 logger = logging.getLogger(__name__)
 
 
+def __func__(inp):
+    if six.PY3:
+        return inp
+    else:
+        return inp.__func__
+
+
 class BaseSlavedStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
         super(BaseSlavedStore, self).__init__(db_conn, hs)
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 87eaa53004..4f19fd35aa 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -17,7 +17,7 @@ from synapse.storage import DataStore
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
 from ._slaved_id_tracker import SlavedIdTracker
 
 
@@ -43,11 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
             expiry_ms=30 * 60 * 1000,
         )
 
-    get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
-    get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
-    get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
-    delete_messages_for_device = DataStore.delete_messages_for_device.__func__
-    delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__
+    get_to_device_stream_token = __func__(DataStore.get_to_device_stream_token)
+    get_new_messages_for_device = __func__(DataStore.get_new_messages_for_device)
+    get_new_device_msgs_for_remote = __func__(DataStore.get_new_device_msgs_for_remote)
+    delete_messages_for_device = __func__(DataStore.delete_messages_for_device)
+    delete_device_msgs_for_remote = __func__(DataStore.delete_device_msgs_for_remote)
 
     def stream_positions(self):
         result = super(SlavedDeviceInboxStore, self).stream_positions()
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 21b8c468fa..ec2fd561cc 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -13,23 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import six
-
 from synapse.storage import DataStore
 from synapse.storage.end_to_end_keys import EndToEndKeyStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
 from ._slaved_id_tracker import SlavedIdTracker
 
 
-def __func__(inp):
-    if six.PY3:
-        return inp
-    else:
-        return inp.__func__
-
-
 class SlavedDeviceStore(BaseSlavedStore):
     def __init__(self, db_conn, hs):
         super(SlavedDeviceStore, self).__init__(db_conn, hs)
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index 5777f07c8d..e933b170bb 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -16,7 +16,7 @@
 from synapse.storage import DataStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
 from ._slaved_id_tracker import SlavedIdTracker
 
 
@@ -33,9 +33,9 @@ class SlavedGroupServerStore(BaseSlavedStore):
             "_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(),
         )
 
-    get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__
-    get_group_stream_token = DataStore.get_group_stream_token.__func__
-    get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__
+    get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user)
+    get_group_stream_token = __func__(DataStore.get_group_stream_token)
+    get_all_groups_for_user = __func__(DataStore.get_all_groups_for_user)
 
     def stream_positions(self):
         result = super(SlavedGroupServerStore, self).stream_positions()
diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
index 05ed168463..8032f53fec 100644
--- a/synapse/replication/slave/storage/keys.py
+++ b/synapse/replication/slave/storage/keys.py
@@ -16,7 +16,7 @@
 from synapse.storage import DataStore
 from synapse.storage.keys import KeyStore
 
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
 
 
 class SlavedKeyStore(BaseSlavedStore):
@@ -24,11 +24,11 @@ class SlavedKeyStore(BaseSlavedStore):
         "_get_server_verify_key"
     ]
 
-    get_server_verify_keys = DataStore.get_server_verify_keys.__func__
-    store_server_verify_key = DataStore.store_server_verify_key.__func__
+    get_server_verify_keys = __func__(DataStore.get_server_verify_keys)
+    store_server_verify_key = __func__(DataStore.store_server_verify_key)
 
-    get_server_certificate = DataStore.get_server_certificate.__func__
-    store_server_certificate = DataStore.store_server_certificate.__func__
+    get_server_certificate = __func__(DataStore.get_server_certificate)
+    store_server_certificate = __func__(DataStore.store_server_certificate)
 
-    get_server_keys_json = DataStore.get_server_keys_json.__func__
-    store_server_keys_json = DataStore.store_server_keys_json.__func__
+    get_server_keys_json = __func__(DataStore.get_server_keys_json)
+    store_server_keys_json = __func__(DataStore.store_server_keys_json)
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index 80b744082a..92447b00d4 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -17,7 +17,7 @@ from synapse.storage import DataStore
 from synapse.storage.presence import PresenceStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
 from ._slaved_id_tracker import SlavedIdTracker
 
 
@@ -34,8 +34,8 @@ class SlavedPresenceStore(BaseSlavedStore):
             "PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
         )
 
-    _get_active_presence = DataStore._get_active_presence.__func__
-    take_presence_startup_info = DataStore.take_presence_startup_info.__func__
+    _get_active_presence = __func__(DataStore._get_active_presence)
+    take_presence_startup_info = __func__(DataStore.take_presence_startup_info)
     _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
     get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
 
diff --git a/synctl b/synctl
index 356e5cb6a7..09b64459b1 100755
--- a/synctl
+++ b/synctl
@@ -280,7 +280,7 @@ def main():
             if worker.cache_factor:
                 os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
 
-            for cache_name, factor in worker.cache_factors.iteritems():
+            for cache_name, factor in iteritems(worker.cache_factors):
                 os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
 
             start_worker(worker.app, configfile, worker.configfile)