diff options
-rw-r--r-- | synapse/app/frontend_proxy.py | 31 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 3 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 28 | ||||
-rw-r--r-- | synapse/handlers/initial_sync.py | 1 | ||||
-rw-r--r-- | synapse/handlers/message.py | 2 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 5 | ||||
-rw-r--r-- | synapse/handlers/room_list.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 32 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 2 | ||||
-rw-r--r-- | synapse/replication/slave/storage/client_ips.py | 2 | ||||
-rw-r--r-- | synapse/replication/tcp/streams.py | 2 | ||||
-rw-r--r-- | synapse/rest/client/v1/admin.py | 22 | ||||
-rw-r--r-- | synapse/rest/client/v1/presence.py | 2 | ||||
-rw-r--r-- | synapse/storage/client_ips.py | 2 | ||||
-rw-r--r-- | synapse/storage/event_push_actions.py | 8 | ||||
-rw-r--r-- | synapse/storage/events.py | 9 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 2 | ||||
-rw-r--r-- | synapse/storage/search.py | 2 | ||||
-rw-r--r-- | synapse/storage/state.py | 5 | ||||
-rw-r--r-- | tests/rest/client/v1/test_rooms.py | 12 |
20 files changed, 119 insertions, 55 deletions
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index e32ee8fe93..cf716ed838 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -36,6 +36,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns from synapse.rest.client.v2_alpha._base import client_v2_patterns from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -49,6 +50,35 @@ from twisted.web.resource import Resource logger = logging.getLogger("synapse.app.frontend_proxy") +class PresenceStatusStubServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status") + + def __init__(self, hs): + super(PresenceStatusStubServlet, self).__init__(hs) + self.http_client = hs.get_simple_http_client() + self.auth = hs.get_auth() + self.main_uri = hs.config.worker_main_http_uri + + @defer.inlineCallbacks + def on_GET(self, request, user_id): + # Pass through the auth headers, if any, in case the access token + # is there. + auth_headers = request.requestHeaders.getRawHeaders("Authorization", []) + headers = { + "Authorization": auth_headers, + } + result = yield self.http_client.get_json( + self.main_uri + request.uri, + headers=headers, + ) + defer.returnValue((200, result)) + + @defer.inlineCallbacks + def on_PUT(self, request, user_id): + yield self.auth.get_user_by_req(request) + defer.returnValue((200, {})) + + class KeyUploadServlet(RestServlet): PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$") @@ -135,6 +165,7 @@ class FrontendProxyServer(HomeServer): elif name == "client": resource = JsonResource(self, canonical_json=False) KeyUploadServlet(self).register(resource) + PresenceStatusStubServlet(self).register(resource) resources.update({ "/_matrix/client/r0": resource, "/_matrix/client/unstable": resource, diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f87531f1b6..6ce349cfba 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -117,6 +117,7 @@ class SynchrotronPresence(object): logger.info("Presence process_id is %r", self.process_id) def send_user_sync(self, user_id, is_syncing, last_sync_ms): + return self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms) def mark_as_coming_online(self, user_id): @@ -214,6 +215,8 @@ class SynchrotronPresence(object): yield self.notify_from_replication(states, stream_id) def get_currently_syncing_users(self): + # presence is disabled on matrix.org, so we return the empty set + return set() return [ user_id for user_id, count in self.user_to_num_current_syncs.iteritems() if count > 0 diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index a141ec9953..6da60718a9 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -184,17 +184,22 @@ class TransactionQueue(object): if not is_mine and send_on_behalf_of is None: continue - # Get the state from before the event. - # We need to make sure that this is the state from before - # the event and not from after it. - # Otherwise if the last member on a server in a room is - # banned then it won't receive the event because it won't - # be in the room after the ban. - destinations = yield self.state.get_current_hosts_in_room( - event.room_id, latest_event_ids=[ - prev_id for prev_id, _ in event.prev_events - ], - ) + try: + # Get the state from before the event. + # We need to make sure that this is the state from before + # the event and not from after it. + # Otherwise if the last member on a server in a room is + # banned then it won't receive the event because it won't + # be in the room after the ban. + destinations = yield self.state.get_current_hosts_in_room( + event.room_id, latest_event_ids=[ + prev_id for prev_id, _ in event.prev_events + ], + ) + except Exception: + logger.exception("Failed to calculate hosts in room") + continue + destinations = set(destinations) if send_on_behalf_of is not None: @@ -254,6 +259,7 @@ class TransactionQueue(object): Args: states (list(UserPresenceState)) """ + return # First we queue up the new presence by user ID, so multiple presence # updates in quick successtion are correctly handled diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index c5267b4b84..df56252185 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -372,6 +372,7 @@ class InitialSyncHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): + defer.returnValue([]) states = yield presence_handler.get_states( [m.user_id for m in room_members], as_event=True, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1c3ac03f20..7adb0ce199 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -283,7 +283,7 @@ class MessageHandler(BaseHandler): # If this is an AS, double check that they are allowed to see the members. # This can either be because the AS user is in the room or becuase there # is a user in the room that the AS is "interested in" - if requester.app_service and user_id not in users_with_profile: + if False and requester.app_service and user_id not in users_with_profile: for uid in users_with_profile: if requester.app_service.is_interested_in_user(uid): break diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index cb158ba962..17f577d5cd 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -372,6 +372,7 @@ class PresenceHandler(object): """We've seen the user do something that indicates they're interacting with the app. """ + return user_id = user.to_string() bump_active_time_counter.inc() @@ -401,6 +402,7 @@ class PresenceHandler(object): Useful for streams that are not associated with an actual client that is being used by a user. """ + affect_presence = False if affect_presence: curr_sync = self.user_to_num_current_syncs.get(user_id, 0) self.user_to_num_current_syncs[user_id] = curr_sync + 1 @@ -443,6 +445,8 @@ class PresenceHandler(object): Returns: set(str): A set of user_id strings. """ + # presence is disabled on matrix.org, so we return the empty set + return set() syncing_user_ids = { user_id for user_id, count in self.user_to_num_current_syncs.items() if count @@ -462,6 +466,7 @@ class PresenceHandler(object): syncing_user_ids(set(str)): The set of user_ids that are currently syncing on that server. """ + return # Grab the previous list of user_ids that were syncing on that process prev_syncing_user_ids = ( diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index dfa09141ed..387b0692d8 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -44,7 +44,7 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache(hs) + self.response_cache = ResponseCache(hs, timeout_ms=10 * 60 * 1000) self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000) def get_local_public_room_list(self, limit=None, since_token=None, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 37dc5e99ab..703c9a3a97 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -28,7 +28,7 @@ from synapse.api.constants import ( ) from synapse.api.errors import AuthError, SynapseError, Codes from synapse.types import UserID, RoomID -from synapse.util.async import Linearizer +from synapse.util.async import Linearizer, Limiter from synapse.util.distributor import user_left_room, user_joined_room from ._base import BaseHandler @@ -50,6 +50,7 @@ class RoomMemberHandler(BaseHandler): self.event_creation_hander = hs.get_event_creation_handler() self.member_linearizer = Linearizer(name="member") + self.member_limiter = Limiter(3) self.clock = hs.get_clock() self.spam_checker = hs.get_spam_checker() @@ -161,18 +162,23 @@ class RoomMemberHandler(BaseHandler): ): key = (room_id,) - with (yield self.member_linearizer.queue(key)): - result = yield self._update_membership( - requester, - target, - room_id, - action, - txn_id=txn_id, - remote_room_hosts=remote_room_hosts, - third_party_signed=third_party_signed, - ratelimit=ratelimit, - content=content, - ) + as_id = object() + if requester.app_service: + as_id = requester.app_service.id + + with (yield self.member_limiter.queue(as_id)): + with (yield self.member_linearizer.queue(key)): + result = yield self._update_membership( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + content=content, + ) defer.returnValue(result) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b12988f3c9..c95ca2ade8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -585,7 +585,7 @@ class SyncHandler(object): since_token is None and sync_config.filter_collection.blocks_all_presence() ) - if not block_all_presence_data: + if False and not block_all_presence_data: yield self._generate_sync_entry_for_presence( sync_result_builder, newly_joined_rooms, newly_joined_users ) diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py index 352c9a2aa8..fedf7a3188 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py @@ -42,6 +42,8 @@ class SlavedClientIpStore(BaseSlavedStore): if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: return + self.client_ip_last_seen.prefill(key, now) + self.hs.get_tcp_replication().send_user_ip( user_id, access_token, ip, user_agent, device_id, now ) diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 4c60bf79f9..07d99dd63c 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -33,7 +33,7 @@ import logging logger = logging.getLogger(__name__) -MAX_EVENTS_BEHIND = 10000 +MAX_EVENTS_BEHIND = 500000 EventStreamRow = namedtuple("EventStreamRow", ( diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 6073cc6fa2..b34f4e0cac 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -212,17 +212,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): ) new_room_id = info["room_id"] - yield self.event_creation_handler.create_and_send_nonmember_event( - room_creator_requester, - { - "type": "m.room.message", - "content": {"body": message, "msgtype": "m.text"}, - "room_id": new_room_id, - "sender": new_room_user_id, - }, - ratelimit=False, - ) - requester_user_id = requester.user.to_string() logger.info("Shutting down room %r", room_id) @@ -260,6 +249,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): kicked_users.append(user_id) + yield self.event_creation_handler.create_and_send_nonmember_event( + room_creator_requester, + { + "type": "m.room.message", + "content": {"body": message, "msgtype": "m.text"}, + "room_id": new_room_id, + "sender": new_room_user_id, + }, + ratelimit=False, + ) + aliases_for_room = yield self.store.get_aliases_for_room(room_id) yield self.store.update_aliases_for_room( diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index 4a73813c58..b7e765fa0e 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -81,7 +81,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet): except Exception: raise SynapseError(400, "Unable to parse state") - yield self.presence_handler.set_state(user, state) + # yield self.presence_handler.set_state(user, state) defer.returnValue((200, {})) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index a03d1d6104..fba3ecc09c 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller # times give more inserts into the database even for readonly API hits # 120 seconds == 2 minutes -LAST_SEEN_GRANULARITY = 120 * 1000 +LAST_SEEN_GRANULARITY = 10 * 60 * 1000 class ClientIpStore(background_updates.BackgroundUpdateStore): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 8efe2fd4bb..00a79fc5ac 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -87,6 +87,8 @@ class EventPushActionsStore(SQLBaseStore): self._rotate_notif_loop = self._clock.looping_call( self._rotate_notifs, 30 * 60 * 1000 ) + self._rotate_delay = 3 + self._rotate_count = 10000 def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ @@ -629,7 +631,7 @@ class EventPushActionsStore(SQLBaseStore): ) if caught_up: break - yield sleep(5) + yield sleep(self._rotate_delay) finally: self._doing_notif_rotation = False @@ -650,8 +652,8 @@ class EventPushActionsStore(SQLBaseStore): txn.execute(""" SELECT stream_ordering FROM event_push_actions WHERE stream_ordering > ? - ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000 - """, (old_rotate_stream_ordering,)) + ORDER BY stream_ordering ASC LIMIT 1 OFFSET ? + """, (old_rotate_stream_ordering, self._rotate_count)) stream_row = txn.fetchone() if stream_row: offset_stream_ordering, = stream_row diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 28cce2979c..16c97c4b09 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1043,7 +1043,6 @@ class EventsStore(SQLBaseStore): "event_edge_hashes", "event_edges", "event_forward_extremities", - "event_push_actions", "event_reference_hashes", "event_search", "event_signatures", @@ -1063,6 +1062,14 @@ class EventsStore(SQLBaseStore): [(ev.event_id,) for ev, _ in events_and_contexts] ) + for table in ( + "event_push_actions", + ): + txn.executemany( + "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,), + [(ev.event_id,) for ev, _ in events_and_contexts] + ) + def _store_event_txn(self, txn, events_and_contexts): """Insert new events into the event and event_json tables diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 3e77fd3901..75ac06da85 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -675,7 +675,7 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(result) - @cached(max_entries=10000, iterable=True) + @cached(max_entries=10000) def _get_joined_hosts_cache(self, room_id): return _JoinedHostsCache(self, room_id) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 2755acff40..52ad196e9e 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -719,7 +719,7 @@ def _parse_query(database_engine, search_term): results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) if isinstance(database_engine, PostgresEngine): - return " & ".join(result + ":*" for result in results) + return " & ".join(result for result in results) elif isinstance(database_engine, Sqlite3Engine): return " & ".join(result + "*" for result in results) else: diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 2b325e1c1f..43e75bf0a0 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -54,7 +54,7 @@ class StateGroupWorkerStore(SQLBaseStore): super(StateGroupWorkerStore, self).__init__(db_conn, hs) self._state_group_cache = DictionaryCache( - "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR + "*stateGroupCache*", 500000 * CACHE_SIZE_FACTOR ) @cached(max_entries=100000, iterable=True) @@ -546,8 +546,7 @@ class StateGroupWorkerStore(SQLBaseStore): state_dict = results[group] state_dict.update( - ((intern_string(k[0]), intern_string(k[1])), to_ascii(v)) - for k, v in group_state_dict.iteritems() + group_state_dict.iteritems() ) self._state_group_cache.update( diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 9f37255381..2e9fc84c13 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -984,11 +984,13 @@ class RoomInitialSyncTestCase(RestTestCase): self.assertTrue("presence" in response) - presence_by_user = { - e["content"]["user_id"]: e for e in response["presence"] - } - self.assertTrue(self.user_id in presence_by_user) - self.assertEquals("m.presence", presence_by_user[self.user_id]["type"]) + # presence is turned off on hotfixes + + # presence_by_user = { + # e["content"]["user_id"]: e for e in response["presence"] + # } + # self.assertTrue(self.user_id in presence_by_user) + # self.assertEquals("m.presence", presence_by_user[self.user_id]["type"]) class RoomMessageListTestCase(RestTestCase): |