summary refs log tree commit diff
diff options
context:
space:
mode:
-rwxr-xr-xsynapse/app/synctl.py20
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/handlers/room_list.py9
-rw-r--r--synapse/handlers/room_member.py44
-rw-r--r--synapse/handlers/sync.py6
-rw-r--r--synapse/push/httppusher.py7
-rw-r--r--synapse/replication/slave/storage/client_ips.py2
-rw-r--r--synapse/replication/tcp/streams.py2
-rw-r--r--synapse/rest/client/v1/admin.py22
-rw-r--r--synapse/storage/client_ips.py2
-rw-r--r--synapse/storage/roommember.py11
-rw-r--r--synapse/storage/search.py2
-rw-r--r--tests/rest/client/v1/test_rooms.py12
13 files changed, 97 insertions, 44 deletions
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py

index d658f967ba..356e5cb6a7 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py
@@ -111,7 +111,7 @@ def stop(pidfile, app): Worker = collections.namedtuple("Worker", [ - "app", "configfile", "pidfile", "cache_factor" + "app", "configfile", "pidfile", "cache_factor", "cache_factors", ]) @@ -218,6 +218,10 @@ def main(): or pidfile ) worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor + worker_cache_factors = ( + worker_config.get("synctl_cache_factors") + or cache_factors + ) daemonize = worker_config.get("daemonize") or config.get("daemonize") assert daemonize, "Main process must have daemonize set to true" @@ -233,8 +237,10 @@ def main(): assert worker_daemonize, "In config %r: expected '%s' to be True" % ( worker_configfile, "worker_daemonize") worker_cache_factor = worker_config.get("synctl_cache_factor") + worker_cache_factors = worker_config.get("synctl_cache_factors", {}) workers.append(Worker( worker_app, worker_configfile, worker_pidfile, worker_cache_factor, + worker_cache_factors, )) action = options.action @@ -269,15 +275,19 @@ def main(): start(configfile) for worker in workers: + env = os.environ.copy() + if worker.cache_factor: os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor) + for cache_name, factor in worker.cache_factors.iteritems(): + os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor) + start_worker(worker.app, configfile, worker.configfile) - if cache_factor: - os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) - else: - os.environ.pop("SYNAPSE_CACHE_FACTOR", None) + # Reset env back to the original + os.environ.clear() + os.environ.update(env) if __name__ == "__main__": diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e484061cc0..ab6b5d2b0e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -203,7 +203,7 @@ class MessageHandler(object): # If this is an AS, double check that they are allowed to see the members. # This can either be because the AS user is in the room or because there # is a user in the room that the AS is "interested in" - if requester.app_service and user_id not in users_with_profile: + if False and requester.app_service and user_id not in users_with_profile: for uid in users_with_profile: if requester.app_service.is_interested_in_user(uid): break diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 38e1737ec9..c05aa7ba65 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py
@@ -44,9 +44,12 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache(hs, "room_list") - self.remote_response_cache = ResponseCache(hs, "remote_room_list", - timeout_ms=30 * 1000) + self.response_cache = ResponseCache( + hs, "room_list", timeout_ms=10 * 60 * 1000, + ) + self.remote_response_cache = ResponseCache( + hs, "remote_room_list", timeout_ms=30 * 1000, + ) def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index f643619047..5a587ec7b5 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -66,6 +66,7 @@ class RoomMemberHandler(object): self.event_creation_hander = hs.get_event_creation_handler() self.member_linearizer = Linearizer(name="member") + self.member_limiter = Linearizer(max_count=10, name="member_as_limiter") self.clock = hs.get_clock() self.spam_checker = hs.get_spam_checker() @@ -241,18 +242,37 @@ class RoomMemberHandler(object): ): key = (room_id,) - with (yield self.member_linearizer.queue(key)): - result = yield self._update_membership( - requester, - target, - room_id, - action, - txn_id=txn_id, - remote_room_hosts=remote_room_hosts, - third_party_signed=third_party_signed, - ratelimit=ratelimit, - content=content, - ) + as_id = object() + if requester.app_service: + as_id = requester.app_service.id + + then = self.clock.time_msec() + + with (yield self.member_limiter.queue(as_id)): + diff = self.clock.time_msec() - then + + if diff > 80 * 1000: + # haproxy would have timed the request out anyway... + raise SynapseError(504, "took to long to process") + + with (yield self.member_linearizer.queue(key)): + diff = self.clock.time_msec() - then + + if diff > 80 * 1000: + # haproxy would have timed the request out anyway... + raise SynapseError(504, "took to long to process") + + result = yield self._update_membership( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + content=content, + ) defer.returnValue(result) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9bca4e7067..456eaf86cb 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -36,6 +36,8 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) +SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000 + # Store the cache that tracks which lazy-loaded members have been sent to a given # client for no more than 30 minutes. LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000 @@ -192,7 +194,9 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache(hs, "sync") + self.response_cache = ResponseCache( + hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS, + ) self.state = hs.get_state_handler() self.auth = hs.get_auth() diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 81e18bcf7d..a4e8cafdc9 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py
@@ -331,7 +331,12 @@ class HttpPusher(object): if not notification_dict: defer.returnValue([]) try: - resp = yield self.http_client.post_json_get_json(self.url, notification_dict) + url = self.url.replace( + "https://matrix.org/_matrix/push/v1/notify", + "http://http-priv.matrix.org/_matrix/push/v1/notify", + ) + + resp = yield self.http_client.post_json_get_json(url, notification_dict) except Exception: logger.warn( "Failed to push event %s to %s", diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 60641f1a49..5b8521c770 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py
@@ -43,6 +43,8 @@ class SlavedClientIpStore(BaseSlavedStore): if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: return + self.client_ip_last_seen.prefill(key, now) + self.hs.get_tcp_replication().send_user_ip( user_id, access_token, ip, user_agent, device_id, now ) diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index 55fe701c5c..ab65a6792e 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py
@@ -32,7 +32,7 @@ from twisted.internet import defer logger = logging.getLogger(__name__) -MAX_EVENTS_BEHIND = 10000 +MAX_EVENTS_BEHIND = 500000 EventStreamRow = namedtuple("EventStreamRow", ( diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 41534b8c2a..b9c3bc4f9f 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py
@@ -457,17 +457,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): ) new_room_id = info["room_id"] - yield self.event_creation_handler.create_and_send_nonmember_event( - room_creator_requester, - { - "type": "m.room.message", - "content": {"body": message, "msgtype": "m.text"}, - "room_id": new_room_id, - "sender": new_room_user_id, - }, - ratelimit=False, - ) - requester_user_id = requester.user.to_string() logger.info("Shutting down room %r", room_id) @@ -505,6 +494,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): kicked_users.append(user_id) + yield self.event_creation_handler.create_and_send_nonmember_event( + room_creator_requester, + { + "type": "m.room.message", + "content": {"body": message, "msgtype": "m.text"}, + "room_id": new_room_id, + "sender": new_room_user_id, + }, + ratelimit=False, + ) + aliases_for_room = yield self.store.get_aliases_for_room(room_id) yield self.store.update_aliases_for_room( diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 8fc678fa67..ea5a969f4c 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller # times give more inserts into the database even for readonly API hits # 120 seconds == 2 minutes -LAST_SEEN_GRANULARITY = 120 * 1000 +LAST_SEEN_GRANULARITY = 10 * 60 * 1000 class ClientIpStore(background_updates.BackgroundUpdateStore): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 0707f9a86a..d384526ba5 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py
@@ -72,7 +72,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids) defer.returnValue(hosts) - @cached(max_entries=100000, iterable=True) + @cachedInlineCallbacks(max_entries=100000, iterable=True) def get_users_in_room(self, room_id): def f(txn): sql = ( @@ -86,7 +86,14 @@ class RoomMemberWorkerStore(EventsWorkerStore): txn.execute(sql, (room_id, Membership.JOIN,)) return [to_ascii(r[0]) for r in txn] - return self.runInteraction("get_users_in_room", f) + start_time = self._clock.time_msec() + result = yield self.runInteraction("get_users_in_room", f) + end_time = self._clock.time_msec() + logger.info( + "Fetched room membership for %s (%i users) in %i ms", + room_id, len(result), end_time - start_time, + ) + defer.returnValue(result) @cached(max_entries=100000) def get_room_summary(self, room_id): diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index d5b5df93e6..a35291a3f6 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py
@@ -724,7 +724,7 @@ def _parse_query(database_engine, search_term): results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) if isinstance(database_engine, PostgresEngine): - return " & ".join(result + ":*" for result in results) + return " & ".join(result for result in results) elif isinstance(database_engine, Sqlite3Engine): return " & ".join(result + "*" for result in results) else: diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py
index 359f7777ff..f2aba3c3f4 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py
@@ -761,11 +761,13 @@ class RoomInitialSyncTestCase(RoomBase): self.assertTrue("presence" in channel.json_body) - presence_by_user = { - e["content"]["user_id"]: e for e in channel.json_body["presence"] - } - self.assertTrue(self.user_id in presence_by_user) - self.assertEquals("m.presence", presence_by_user[self.user_id]["type"]) + # presence is turned off on hotfixes + + # presence_by_user = { + # e["content"]["user_id"]: e for e in channel.json_body["presence"] + # } + # self.assertTrue(self.user_id in presence_by_user) + # self.assertEquals("m.presence", presence_by_user[self.user_id]["type"]) class RoomMessageListTestCase(RoomBase):