diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index d2bae4ad03..e6db28333f 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -37,6 +37,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
from synapse.rest.client.v2_alpha._base import client_v2_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -50,6 +51,35 @@ from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.frontend_proxy")
+class PresenceStatusStubServlet(ClientV1RestServlet):
+ PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
+
+ def __init__(self, hs):
+ super(PresenceStatusStubServlet, self).__init__(hs)
+ self.http_client = hs.get_simple_http_client()
+ self.auth = hs.get_auth()
+ self.main_uri = hs.config.worker_main_http_uri
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, user_id):
+ # Pass through the auth headers, if any, in case the access token
+ # is there.
+ auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
+ headers = {
+ "Authorization": auth_headers,
+ }
+ result = yield self.http_client.get_json(
+ self.main_uri + request.uri,
+ headers=headers,
+ )
+ defer.returnValue((200, result))
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, user_id):
+ yield self.auth.get_user_by_req(request)
+ defer.returnValue((200, {}))
+
+
class KeyUploadServlet(RestServlet):
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@@ -136,6 +166,7 @@ class FrontendProxyServer(HomeServer):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource)
+ PresenceStatusStubServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 82f06ea185..67a2d2b7db 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -116,6 +116,7 @@ class SynchrotronPresence(object):
logger.info("Presence process_id is %r", self.process_id)
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
+ return
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
def mark_as_coming_online(self, user_id):
@@ -213,6 +214,8 @@ class SynchrotronPresence(object):
yield self.notify_from_replication(states, stream_id)
def get_currently_syncing_users(self):
+ # presence is disabled on matrix.org, so we return the empty set
+ return set()
return [
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 56ae086128..8c525459c7 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -108,7 +108,7 @@ def stop(pidfile, app):
Worker = collections.namedtuple("Worker", [
- "app", "configfile", "pidfile", "cache_factor"
+ "app", "configfile", "pidfile", "cache_factor", "cache_factors",
])
@@ -215,6 +215,10 @@ def main():
or pidfile
)
worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
+ worker_cache_factors = (
+ worker_config.get("synctl_cache_factors")
+ or cache_factors
+ )
daemonize = worker_config.get("daemonize") or config.get("daemonize")
assert daemonize, "Main process must have daemonize set to true"
@@ -230,8 +234,10 @@ def main():
assert worker_daemonize, "In config %r: expected '%s' to be True" % (
worker_configfile, "worker_daemonize")
worker_cache_factor = worker_config.get("synctl_cache_factor")
+ worker_cache_factors = worker_config.get("synctl_cache_factors", {})
workers.append(Worker(
worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
+ worker_cache_factors,
))
action = options.action
@@ -266,15 +272,19 @@ def main():
start(configfile)
for worker in workers:
+ env = os.environ.copy()
+
if worker.cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
+ for cache_name, factor in worker.cache_factors.iteritems():
+ os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
+
start_worker(worker.app, configfile, worker.configfile)
- if cache_factor:
- os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
- else:
- os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
+ # Reset env back to the original
+ os.environ.clear()
+ os.environ.update(env)
if __name__ == "__main__":
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index d72b057e28..b9d0639a1b 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -303,6 +303,7 @@ class TransactionQueue(object):
Args:
states (list(UserPresenceState))
"""
+ return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 71af86fe21..5242309c1e 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -373,6 +373,7 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
+ defer.returnValue([])
states = yield presence_handler.get_states(
[m.user_id for m in room_members],
as_event=True,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 8467284758..c2807ddacd 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -390,7 +390,7 @@ class MessageHandler(BaseHandler):
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or becuase there
# is a user in the room that the AS is "interested in"
- if requester.app_service and user_id not in users_with_profile:
+ if False and requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7db59fba00..cc4d0fa5c2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -391,6 +391,7 @@ class PresenceHandler(object):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
+ return
user_id = user.to_string()
bump_active_time_counter.inc()
@@ -420,6 +421,7 @@ class PresenceHandler(object):
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
+ affect_presence = False
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -465,6 +467,8 @@ class PresenceHandler(object):
Returns:
set(str): A set of user_id strings.
"""
+ # presence is disabled on matrix.org, so we return the empty set
+ return set()
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index fc507cef36..878db0db1e 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -46,9 +46,12 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
- self.response_cache = ResponseCache(hs, "room_list")
- self.remote_response_cache = ResponseCache(hs, "remote_room_list",
- timeout_ms=30 * 1000)
+ self.response_cache = ResponseCache(
+ hs, "room_list", timeout_ms=10 * 60 * 1000,
+ )
+ self.remote_response_cache = ResponseCache(
+ hs, "remote_room_list", timeout_ms=30 * 1000,
+ )
def get_local_public_room_list(self, limit=None, since_token=None,
search_filter=None,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index f930e939e8..edc33e466a 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -31,7 +31,7 @@ from synapse.api.constants import (
)
from synapse.api.errors import AuthError, SynapseError, Codes
from synapse.types import UserID, RoomID
-from synapse.util.async import Linearizer
+from synapse.util.async import Linearizer, Limiter
from synapse.util.distributor import user_left_room, user_joined_room
@@ -68,6 +68,7 @@ class RoomMemberHandler(object):
self.event_creation_hander = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
+ self.member_limiter = Limiter(10)
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
@@ -241,18 +242,23 @@ class RoomMemberHandler(object):
):
key = (room_id,)
- with (yield self.member_linearizer.queue(key)):
- result = yield self._update_membership(
- requester,
- target,
- room_id,
- action,
- txn_id=txn_id,
- remote_room_hosts=remote_room_hosts,
- third_party_signed=third_party_signed,
- ratelimit=ratelimit,
- content=content,
- )
+ as_id = object()
+ if requester.app_service:
+ as_id = requester.app_service.id
+
+ with (yield self.member_limiter.queue(as_id)):
+ with (yield self.member_linearizer.queue(key)):
+ result = yield self._update_membership(
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=txn_id,
+ remote_room_hosts=remote_room_hosts,
+ third_party_signed=third_party_signed,
+ ratelimit=ratelimit,
+ content=content,
+ )
defer.returnValue(result)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 7f486e48e5..dc2446bbed 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -32,6 +32,7 @@ from six import itervalues, iteritems
logger = logging.getLogger(__name__)
+SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
@@ -178,7 +179,9 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
- self.response_cache = ResponseCache(hs, "sync")
+ self.response_cache = ResponseCache(
+ hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS,
+ )
self.state = hs.get_state_handler()
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
@@ -620,7 +623,7 @@ class SyncHandler(object):
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
- if not block_all_presence_data:
+ if False and not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
)
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 7a481b5a1e..cb63d7917f 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -331,7 +331,12 @@ class HttpPusher(object):
if not notification_dict:
defer.returnValue([])
try:
- resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
+ url = self.url.replace(
+ "https://matrix.org/_matrix/push/v1/notify",
+ "http://http-priv.matrix.org/_matrix/push/v1/notify",
+ )
+
+ resp = yield self.http_client.post_json_get_json(url, notification_dict)
except Exception:
logger.warn(
"Failed to push event %s to %s",
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 352c9a2aa8..fedf7a3188 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -42,6 +42,8 @@ class SlavedClientIpStore(BaseSlavedStore):
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
+ self.client_ip_last_seen.prefill(key, now)
+
self.hs.get_tcp_replication().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
)
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index 4c60bf79f9..07d99dd63c 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -33,7 +33,7 @@ import logging
logger = logging.getLogger(__name__)
-MAX_EVENTS_BEHIND = 10000
+MAX_EVENTS_BEHIND = 500000
EventStreamRow = namedtuple("EventStreamRow", (
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 8fb08dc526..638a7dba34 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -323,17 +323,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
)
new_room_id = info["room_id"]
- yield self.event_creation_handler.create_and_send_nonmember_event(
- room_creator_requester,
- {
- "type": "m.room.message",
- "content": {"body": message, "msgtype": "m.text"},
- "room_id": new_room_id,
- "sender": new_room_user_id,
- },
- ratelimit=False,
- )
-
requester_user_id = requester.user.to_string()
logger.info("Shutting down room %r", room_id)
@@ -371,6 +360,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
kicked_users.append(user_id)
+ yield self.event_creation_handler.create_and_send_nonmember_event(
+ room_creator_requester,
+ {
+ "type": "m.room.message",
+ "content": {"body": message, "msgtype": "m.text"},
+ "room_id": new_room_id,
+ "sender": new_room_user_id,
+ },
+ ratelimit=False,
+ )
+
aliases_for_room = yield self.store.get_aliases_for_room(room_id)
yield self.store.update_aliases_for_room(
diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py
index 647994bd53..a975666d12 100644
--- a/synapse/rest/client/v1/presence.py
+++ b/synapse/rest/client/v1/presence.py
@@ -83,7 +83,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
except Exception:
raise SynapseError(400, "Unable to parse state")
- yield self.presence_handler.set_state(user, state)
+ # yield self.presence_handler.set_state(user, state)
defer.returnValue((200, {}))
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 968d2fed22..30d3541cf7 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120 * 1000
+LAST_SEEN_GRANULARITY = 10 * 60 * 1000
class ClientIpStore(background_updates.BackgroundUpdateStore):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 829cc4a207..682c637c04 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -67,7 +67,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
defer.returnValue(hosts)
- @cached(max_entries=100000, iterable=True)
+ @cachedInlineCallbacks(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
def f(txn):
sql = (
@@ -81,7 +81,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
txn.execute(sql, (room_id, Membership.JOIN,))
return [to_ascii(r[0]) for r in txn]
- return self.runInteraction("get_users_in_room", f)
+ start_time = self._clock.time_msec()
+ result = yield self.runInteraction("get_users_in_room", f)
+ end_time = self._clock.time_msec()
+ logger.info(
+ "Fetched room membership for %s (%i users) in %i ms",
+ room_id, len(result), end_time - start_time,
+ )
+ defer.returnValue(result)
@cached()
def get_invited_rooms_for_user(self, user_id):
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index f0fa5d7631..ef4f587d8c 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -722,7 +722,7 @@ def _parse_query(database_engine, search_term):
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
if isinstance(database_engine, PostgresEngine):
- return " & ".join(result + ":*" for result in results)
+ return " & ".join(result for result in results)
elif isinstance(database_engine, Sqlite3Engine):
return " & ".join(result + "*" for result in results)
else:
diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py
index 61d737725b..5bc0ba1934 100644
--- a/tests/rest/client/v1/test_rooms.py
+++ b/tests/rest/client/v1/test_rooms.py
@@ -984,11 +984,13 @@ class RoomInitialSyncTestCase(RestTestCase):
self.assertTrue("presence" in response)
- presence_by_user = {
- e["content"]["user_id"]: e for e in response["presence"]
- }
- self.assertTrue(self.user_id in presence_by_user)
- self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
+ # presence is turned off on hotfixes
+
+ # presence_by_user = {
+ # e["content"]["user_id"]: e for e in response["presence"]
+ # }
+ # self.assertTrue(self.user_id in presence_by_user)
+ # self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
class RoomMessageListTestCase(RestTestCase):
|