diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/receipts.py | 6 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 1 | ||||
-rw-r--r-- | synapse/notifier.py | 40 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 12 | ||||
-rw-r--r-- | synapse/rest/client/v1/room.py | 36 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 1 | ||||
-rw-r--r-- | synapse/storage/receipts.py | 2 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 102 | ||||
-rw-r--r-- | synapse/storage/schema/delta/39/membership_profile.sql | 20 | ||||
-rw-r--r-- | synapse/storage/stream.py | 3 | ||||
-rw-r--r-- | synapse/util/__init__.py | 7 |
11 files changed, 201 insertions, 29 deletions
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 916e80a48e..50aa513935 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -100,7 +100,7 @@ class ReceiptsHandler(BaseHandler): if not res: # res will be None if this read receipt is 'old' - defer.returnValue(False) + continue stream_id, max_persisted_id = res @@ -109,6 +109,10 @@ class ReceiptsHandler(BaseHandler): if max_batch_id is None or max_persisted_id > max_batch_id: max_batch_id = max_persisted_id + if min_batch_id is None: + # no new receipts + defer.returnValue(False) + affected_room_ids = list(set([r["room_id"] for r in receipts])) with PreserveLoggingContext(): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a86996689c..b62773dcbe 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -510,6 +510,7 @@ class SyncHandler(object): Returns: Deferred(SyncResult) """ + logger.info("Calculating sync response for %r", sync_config.user) # NB: The now_token gets changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability diff --git a/synapse/notifier.py b/synapse/notifier.py index 054ca59ad2..acbd4bb5ae 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError +from synapse.util import DeferredTimedOutError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred from synapse.util.logcontext import PreserveLoggingContext, preserve_fn @@ -294,14 +295,7 @@ class Notifier(object): result = None if timeout: - # Will be set to a _NotificationListener that we'll be waiting on. - # Allows us to cancel it. - listener = None - - def timed_out(): - if listener: - listener.deferred.cancel() - timer = self.clock.call_later(timeout / 1000., timed_out) + end_time = self.clock.time_msec() + timeout prev_token = from_token while not result: @@ -312,6 +306,10 @@ class Notifier(object): if result: break + now = self.clock.time_msec() + if end_time <= now: + break + # Now we wait for the _NotifierUserStream to be told there # is a new token. # We need to supply the token we supplied to callback so @@ -319,11 +317,14 @@ class Notifier(object): prev_token = current_token listener = user_stream.new_listener(prev_token) with PreserveLoggingContext(): - yield listener.deferred + yield self.clock.time_bound_deferred( + listener.deferred, + time_out=(end_time - now) / 1000. + ) + except DeferredTimedOutError: + break except defer.CancelledError: break - - self.clock.cancel_call_later(timer, ignore_errs=True) else: current_token = user_stream.current_token result = yield callback(from_token, current_token) @@ -492,22 +493,27 @@ class Notifier(object): """ listener = _NotificationListener(None) - def timed_out(): - listener.deferred.cancel() + end_time = self.clock.time_msec() + timeout - timer = self.clock.call_later(timeout / 1000., timed_out) while True: listener.deferred = self.replication_deferred.observe() result = yield callback() if result: break + now = self.clock.time_msec() + if end_time <= now: + break + try: with PreserveLoggingContext(): - yield listener.deferred + yield self.clock.time_bound_deferred( + listener.deferred, + time_out=(end_time - now) / 1000. + ) + except DeferredTimedOutError: + break except defer.CancelledError: break - self.clock.cancel_call_later(timer, ignore_errs=True) - defer.returnValue(result) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index be55598c43..78b095c903 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -87,12 +87,12 @@ class BulkPushRuleEvaluator: condition_cache = {} for uid, rules in self.rules_by_user.items(): - display_name = None - member_ev_id = context.current_state_ids.get((EventTypes.Member, uid)) - if member_ev_id: - member_ev = yield self.store.get_event(member_ev_id, allow_none=True) - if member_ev: - display_name = member_ev.content.get("displayname", None) + display_name = room_members.get(uid, {}).get("display_name", None) + if not display_name: + # Handle the case where we are pushing a membership event to + # that user, as they might not be already joined. + if event.type == EventTypes.Member and event.state_key == uid: + display_name = event.content.get("displayname", None) filtered = filtered_by_user[uid] if len(filtered) == 0: diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 3fb1f2deb3..a0bba1fa3b 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -369,6 +369,24 @@ class RoomMemberListRestServlet(ClientV1RestServlet): })) +class JoinedRoomMemberListRestServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/joined_members$") + + def __init__(self, hs): + super(JoinedRoomMemberListRestServlet, self).__init__(hs) + self.state = hs.get_state_handler() + + @defer.inlineCallbacks + def on_GET(self, request, room_id): + yield self.auth.get_user_by_req(request) + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + defer.returnValue((200, { + "joined": users_with_profile + })) + + # TODO: Needs better unit testing class RoomMessageListRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/messages$") @@ -692,6 +710,22 @@ class SearchRestServlet(ClientV1RestServlet): defer.returnValue((200, results)) +class JoinedRoomsRestServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/joined_rooms$") + + def __init__(self, hs): + super(JoinedRoomsRestServlet, self).__init__(hs) + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_GET(self, request): + requester = yield self.auth.get_user_by_req(request, allow_guest=True) + + rooms = yield self.store.get_rooms_for_user(requester.user.to_string()) + room_ids = set(r.room_id for r in rooms) # Ensure they're unique. + defer.returnValue((200, {"joined_rooms": list(room_ids)})) + + def register_txn_path(servlet, regex_string, http_server, with_get=False): """Registers a transaction-based path. @@ -727,6 +761,7 @@ def register_servlets(hs, http_server): RoomStateEventRestServlet(hs).register(http_server) RoomCreateRestServlet(hs).register(http_server) RoomMemberListRestServlet(hs).register(http_server) + JoinedRoomMemberListRestServlet(hs).register(http_server) RoomMessageListRestServlet(hs).register(http_server) JoinRoomAliasServlet(hs).register(http_server) RoomForgetRestServlet(hs).register(http_server) @@ -738,4 +773,5 @@ def register_servlets(hs, http_server): RoomRedactEventRestServlet(hs).register(http_server) RoomTypingRestServlet(hs).register(http_server) SearchRestServlet(hs).register(http_server) + JoinedRoomsRestServlet(hs).register(http_server) RoomEventContext(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index db146ed348..fe936b3e62 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -222,6 +222,7 @@ class DataStore(RoomMemberStore, RoomStore, ) self._stream_order_on_start = self.get_room_max_stream_ordering() + self._min_stream_order_on_start = self.get_room_min_stream_ordering() super(DataStore, self).__init__(hs) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 9747a04a9a..f72d15f5ed 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -405,7 +405,7 @@ class ReceiptsStore(SQLBaseStore): room_id, receipt_type, user_id, event_ids, data ) - max_persisted_id = self._stream_id_gen.get_current_token() + max_persisted_id = self._receipts_id_gen.get_current_token() defer.returnValue((stream_id, max_persisted_id)) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 866d64e679..b2a45a38c1 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -24,6 +24,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.types import get_domain_from_id import logging +import ujson as json logger = logging.getLogger(__name__) @@ -34,7 +35,15 @@ RoomsForUser = namedtuple( ) +_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" + + class RoomMemberStore(SQLBaseStore): + def __init__(self, hs): + super(RoomMemberStore, self).__init__(hs) + self.register_background_update_handler( + _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile + ) def _store_room_members_txn(self, txn, events, backfilled): """Store a room member in the database. @@ -49,6 +58,8 @@ class RoomMemberStore(SQLBaseStore): "sender": event.user_id, "room_id": event.room_id, "membership": event.membership, + "display_name": event.content.get("displayname", None), + "avatar_url": event.content.get("avatar_url", None), } for event in events ] @@ -398,7 +409,7 @@ class RoomMemberStore(SQLBaseStore): table="room_memberships", column="event_id", iterable=member_event_ids, - retcols=['user_id'], + retcols=['user_id', 'display_name', 'avatar_url'], keyvalues={ "membership": Membership.JOIN, }, @@ -406,11 +417,21 @@ class RoomMemberStore(SQLBaseStore): desc="_get_joined_users_from_context", ) - users_in_room = set(row["user_id"] for row in rows) + users_in_room = { + row["user_id"]: { + "display_name": row["display_name"], + "avatar_url": row["avatar_url"], + } + for row in rows + } + if event is not None and event.type == EventTypes.Member: if event.membership == Membership.JOIN: if event.event_id in member_event_ids: - users_in_room.add(event.state_key) + users_in_room[event.state_key] = { + "display_name": event.content.get("displayname", None), + "avatar_url": event.content.get("avatar_url", None), + } defer.returnValue(users_in_room) @@ -448,3 +469,78 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(True) defer.returnValue(False) + + @defer.inlineCallbacks + def _background_add_membership_profile(self, progress, batch_size): + target_min_stream_id = progress.get( + "target_min_stream_id_inclusive", self._min_stream_order_on_start + ) + max_stream_id = progress.get( + "max_stream_id_exclusive", self._stream_order_on_start + 1 + ) + + INSERT_CLUMP_SIZE = 1000 + + def add_membership_profile_txn(txn): + sql = (""" + SELECT stream_ordering, event_id, room_id, content + FROM events + INNER JOIN room_memberships USING (room_id, event_id) + WHERE ? <= stream_ordering AND stream_ordering < ? + AND type = 'm.room.member' + ORDER BY stream_ordering DESC + LIMIT ? + """) + + txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + + rows = self.cursor_to_dict(txn) + if not rows: + return 0 + + min_stream_id = rows[-1]["stream_ordering"] + + to_update = [] + for row in rows: + event_id = row["event_id"] + room_id = row["room_id"] + try: + content = json.loads(row["content"]) + except: + continue + + display_name = content.get("displayname", None) + avatar_url = content.get("avatar_url", None) + + if display_name or avatar_url: + to_update.append(( + display_name, avatar_url, event_id, room_id + )) + + to_update_sql = (""" + UPDATE room_memberships SET display_name = ?, avatar_url = ? + WHERE event_id = ? AND room_id = ? + """) + for index in range(0, len(to_update), INSERT_CLUMP_SIZE): + clump = to_update[index:index + INSERT_CLUMP_SIZE] + txn.executemany(to_update_sql, clump) + + progress = { + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, + } + + self._background_update_progress_txn( + txn, _MEMBERSHIP_PROFILE_UPDATE_NAME, progress + ) + + return len(to_update) + + result = yield self.runInteraction( + _MEMBERSHIP_PROFILE_UPDATE_NAME, add_membership_profile_txn + ) + + if not result: + yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME) + + defer.returnValue(result) diff --git a/synapse/storage/schema/delta/39/membership_profile.sql b/synapse/storage/schema/delta/39/membership_profile.sql new file mode 100644 index 0000000000..1bf911c8ab --- /dev/null +++ b/synapse/storage/schema/delta/39/membership_profile.sql @@ -0,0 +1,20 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE room_memberships ADD COLUMN display_name TEXT; +ALTER TABLE room_memberships ADD COLUMN avatar_url TEXT; + +INSERT into background_updates (update_name, progress_json) + VALUES ('room_membership_profile_update', '{}'); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 7fa63b58a7..2dc24951c4 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -541,6 +541,9 @@ class StreamStore(SQLBaseStore): def get_room_max_stream_ordering(self): return self._stream_id_gen.get_current_token() + def get_room_min_stream_ordering(self): + return self._backfill_id_gen.get_current_token() + def get_stream_token_for_event(self, event_id): """The stream token for an event Args: diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index c05b9450be..30fc480108 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -24,6 +24,11 @@ import logging logger = logging.getLogger(__name__) +class DeferredTimedOutError(SynapseError): + def __init__(self): + super(SynapseError).__init__(504, "Timed out") + + def unwrapFirstError(failure): # defer.gatherResults and DeferredLists wrap failures. failure.trap(defer.FirstError) @@ -89,7 +94,7 @@ class Clock(object): def timed_out_fn(): try: - ret_deferred.errback(SynapseError(504, "Timed out")) + ret_deferred.errback(DeferredTimedOutError()) except: pass |