summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/_base.py30
-rw-r--r--synapse/app/event_creator.py3
-rw-r--r--synapse/app/frontend_proxy.py4
-rwxr-xr-xsynapse/app/homeserver.py43
-rw-r--r--synapse/app/pusher.py21
-rw-r--r--synapse/app/synchrotron.py12
6 files changed, 64 insertions, 49 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 7c866e246a..18584226e9 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -17,6 +17,7 @@ import gc
 import logging
 import sys
 
+import psutil
 from daemonize import Daemonize
 
 from twisted.internet import error, reactor
@@ -24,12 +25,6 @@ from twisted.internet import error, reactor
 from synapse.util import PreserveLoggingContext
 from synapse.util.rlimit import change_resource_limit
 
-try:
-    import affinity
-except Exception:
-    affinity = None
-
-
 logger = logging.getLogger(__name__)
 
 
@@ -89,15 +84,20 @@ def start_reactor(
         with PreserveLoggingContext():
             logger.info("Running")
             if cpu_affinity is not None:
-                if not affinity:
-                    quit_with_error(
-                        "Missing package 'affinity' required for cpu_affinity\n"
-                        "option\n\n"
-                        "Install by running:\n\n"
-                        "   pip install affinity\n\n"
-                    )
-                logger.info("Setting CPU affinity to %s" % cpu_affinity)
-                affinity.set_process_affinity_mask(0, cpu_affinity)
+                # Turn the bitmask into bits, reverse it so we go from 0 up
+                mask_to_bits = bin(cpu_affinity)[2:][::-1]
+
+                cpus = []
+                cpu_num = 0
+
+                for i in mask_to_bits:
+                    if i == "1":
+                        cpus.append(cpu_num)
+                    cpu_num += 1
+
+                p = psutil.Process()
+                p.cpu_affinity(cpus)
+
             change_resource_limit(soft_file_limit)
             if gc_thresholds:
                 gc.set_threshold(*gc_thresholds)
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 9060ab14f6..e4a68715aa 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -178,6 +178,9 @@ def start(config_options):
 
     setup_logging(config, use_worker_options=True)
 
+    # This should only be done on the user directory worker or the master
+    config.update_user_directory = False
+
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index fc4b25de1c..f5c61dec5b 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -68,7 +68,7 @@ class PresenceStatusStubServlet(ClientV1RestServlet):
             "Authorization": auth_headers,
         }
         result = yield self.http_client.get_json(
-            self.main_uri + request.uri,
+            self.main_uri + request.uri.decode('ascii'),
             headers=headers,
         )
         defer.returnValue((200, result))
@@ -125,7 +125,7 @@ class KeyUploadServlet(RestServlet):
                 "Authorization": auth_headers,
             }
             result = yield self.http_client.post_json_get_json(
-                self.main_uri + request.uri,
+                self.main_uri + request.uri.decode('ascii'),
                 body,
                 headers=headers,
             )
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e3f0d99a3f..0b85b377e3 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -20,6 +20,7 @@ import sys
 
 from six import iteritems
 
+import psutil
 from prometheus_client import Gauge
 
 from twisted.application import service
@@ -502,7 +503,6 @@ def run(hs):
 
     def performance_stats_init():
         try:
-            import psutil
             process = psutil.Process()
             # Ensure we can fetch both, and make the initial request for cpu_percent
             # so the next request will use this as the initial point.
@@ -510,12 +510,9 @@ def run(hs):
             process.cpu_percent(interval=None)
             logger.info("report_stats can use psutil")
             stats_process.append(process)
-        except (ImportError, AttributeError):
-            logger.warn(
-                "report_stats enabled but psutil is not installed or incorrect version."
-                " Disabling reporting of memory/cpu stats."
-                " Ensuring psutil is available will help matrix.org track performance"
-                " changes across releases."
+        except (AttributeError):
+            logger.warning(
+                "Unable to read memory/cpu stats. Disabling reporting."
             )
 
     def generate_user_daily_visit_stats():
@@ -530,10 +527,13 @@ def run(hs):
     clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
 
     # monthly active user limiting functionality
-    clock.looping_call(
-        hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
-    )
-    hs.get_datastore().reap_monthly_active_users()
+    def reap_monthly_active_users():
+        return run_as_background_process(
+            "reap_monthly_active_users",
+            hs.get_datastore().reap_monthly_active_users,
+        )
+    clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
+    reap_monthly_active_users()
 
     @defer.inlineCallbacks
     def generate_monthly_active_users():
@@ -547,12 +547,23 @@ def run(hs):
         registered_reserved_users_mau_gauge.set(float(reserved_count))
         max_mau_gauge.set(float(hs.config.max_mau_value))
 
-    hs.get_datastore().initialise_reserved_users(
-        hs.config.mau_limits_reserved_threepids
+    def start_generate_monthly_active_users():
+        return run_as_background_process(
+            "generate_monthly_active_users",
+            generate_monthly_active_users,
+        )
+
+    # XXX is this really supposed to be a background process? it looks
+    # like it needs to complete before some of the other stuff runs.
+    run_as_background_process(
+        "initialise_reserved_users",
+        hs.get_datastore().initialise_reserved_users,
+        hs.config.mau_limits_reserved_threepids,
     )
-    generate_monthly_active_users()
+
+    start_generate_monthly_active_users()
     if hs.config.limit_usage_by_mau:
-        clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
+        clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
     # End of monthly active user settings
 
     if hs.config.report_stats:
@@ -568,7 +579,7 @@ def run(hs):
         clock.call_later(5 * 60, start_phone_stats_home)
 
     if hs.config.daemonize and hs.config.print_pidfile:
-        print (hs.config.pid_file)
+        print(hs.config.pid_file)
 
     _base.start_reactor(
         "synapse-homeserver",
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 630dcda478..83b0863f00 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage._base import __func__
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
@@ -49,31 +50,31 @@ class PusherSlaveStore(
     SlavedAccountDataStore
 ):
     update_pusher_last_stream_ordering_and_success = (
-        DataStore.update_pusher_last_stream_ordering_and_success.__func__
+        __func__(DataStore.update_pusher_last_stream_ordering_and_success)
     )
 
     update_pusher_failing_since = (
-        DataStore.update_pusher_failing_since.__func__
+        __func__(DataStore.update_pusher_failing_since)
     )
 
     update_pusher_last_stream_ordering = (
-        DataStore.update_pusher_last_stream_ordering.__func__
+        __func__(DataStore.update_pusher_last_stream_ordering)
     )
 
     get_throttle_params_by_room = (
-        DataStore.get_throttle_params_by_room.__func__
+        __func__(DataStore.get_throttle_params_by_room)
     )
 
     set_throttle_params = (
-        DataStore.set_throttle_params.__func__
+        __func__(DataStore.set_throttle_params)
     )
 
     get_time_of_last_push_action_before = (
-        DataStore.get_time_of_last_push_action_before.__func__
+        __func__(DataStore.get_time_of_last_push_action_before)
     )
 
     get_profile_displayname = (
-        DataStore.get_profile_displayname.__func__
+        __func__(DataStore.get_profile_displayname)
     )
 
 
@@ -160,11 +161,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
                     else:
                         yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
             elif stream_name == "events":
-                self.pusher_pool.on_new_notifications(
+                yield self.pusher_pool.on_new_notifications(
                     token, token,
                 )
             elif stream_name == "receipts":
-                self.pusher_pool.on_new_receipts(
+                yield self.pusher_pool.on_new_receipts(
                     token, token, set(row.room_id for row in rows)
                 )
         except Exception:
@@ -182,7 +183,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
     def start_pusher(self, user_id, app_id, pushkey):
         key = "%s:%s" % (app_id, pushkey)
         logger.info("Starting pusher %r / %r", user_id, key)
-        return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id)
+        return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
 
 
 def start(config_options):
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 9a7fc6ee9d..3926c7f263 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -33,7 +33,7 @@ from synapse.http.server import JsonResource
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
-from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -147,7 +147,7 @@ class SynchrotronPresence(object):
         and haven't come back yet. If there are poke the master about them.
         """
         now = self.clock.time_msec()
-        for user_id, last_sync_ms in self.users_going_offline.items():
+        for user_id, last_sync_ms in list(self.users_going_offline.items()):
             if now - last_sync_ms > 10 * 1000:
                 self.users_going_offline.pop(user_id, None)
                 self.send_user_sync(user_id, False, last_sync_ms)
@@ -156,9 +156,9 @@ class SynchrotronPresence(object):
         # TODO Hows this supposed to work?
         pass
 
-    get_states = PresenceHandler.get_states.__func__
-    get_state = PresenceHandler.get_state.__func__
-    current_state_for_users = PresenceHandler.current_state_for_users.__func__
+    get_states = __func__(PresenceHandler.get_states)
+    get_state = __func__(PresenceHandler.get_state)
+    current_state_for_users = __func__(PresenceHandler.current_state_for_users)
 
     def user_syncing(self, user_id, affect_presence):
         if affect_presence:
@@ -208,7 +208,7 @@ class SynchrotronPresence(object):
         ) for row in rows]
 
         for state in states:
-            self.user_to_current_state[row.user_id] = state
+            self.user_to_current_state[state.user_id] = state
 
         stream_id = token
         yield self.notify_from_replication(states, stream_id)