From b35baf6f3c5e9f24e9af241eb2423c4b94dd5a14 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 22 Sep 2015 15:13:10 +0100 Subject: Define __repr__ methods for StreamConfig and PaginationConfig So that they can be used with "%r" log formats. --- synapse/streams/config.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'synapse/streams') diff --git a/synapse/streams/config.py b/synapse/streams/config.py index 2ec7c5403b..167bfe0de3 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -34,6 +34,11 @@ class SourcePaginationConfig(object): self.direction = 'f' if direction == 'f' else 'b' self.limit = int(limit) if limit is not None else None + def __repr__(self): + return ( + "StreamConfig(from_key=%r, to_key=%r, direction=%r, limit=%r)" + ) % (self.from_key, self.to_key, self.direction, self.limit) + class PaginationConfig(object): @@ -94,10 +99,10 @@ class PaginationConfig(object): logger.exception("Failed to create pagination config") raise SynapseError(400, "Invalid request.") - def __str__(self): + def __repr__(self): return ( - "" + "PaginationConfig(from_tok=%r, to_tok=%r," + " direction=%r, limit=%r)" ) % (self.from_token, self.to_token, self.direction, self.limit) def get_source_config(self, source_name): -- cgit 1.5.1 From a247729806a1cf7093b3c0819094338bf22affa8 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 22 Sep 2015 18:19:49 +0100 Subject: synapse/streams/events.py:StreamSource was unused --- synapse/streams/events.py | 12 ------------ 1 file changed, 12 deletions(-) (limited to 'synapse/streams') diff --git a/synapse/streams/events.py b/synapse/streams/events.py index aaa3609aa5..8671a8fa4e 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -70,15 +70,3 @@ class EventSources(object): ), ) defer.returnValue(token) - - -class StreamSource(object): - def get_new_events_for_user(self, user, from_key, limit): - """from_key is the key within this event source.""" - raise NotImplementedError("get_new_events_for_user") - - def get_current_key(self): - raise NotImplementedError("get_current_key") - - def get_pagination_rows(self, user, pagination_config, key): - raise NotImplementedError("get_rows") -- cgit 1.5.1 From bb4dddd6c4f85bc5b07119d3f9dec31964b5b6f9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 22 Sep 2015 18:33:34 +0100 Subject: Move NullSource out of synapse and into tests since it is only used by the tests --- synapse/streams/events.py | 16 ---------------- tests/rest/client/v1/test_presence.py | 18 +++++++++++++++++- 2 files changed, 17 insertions(+), 17 deletions(-) (limited to 'synapse/streams') diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 8671a8fa4e..699083ae12 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -23,22 +23,6 @@ from synapse.handlers.typing import TypingNotificationEventSource from synapse.handlers.receipts import ReceiptEventSource -class NullSource(object): - """This event source never yields any events and its token remains at - zero. It may be useful for unit-testing.""" - def __init__(self, hs): - pass - - def get_new_events_for_user(self, user, from_key, limit): - return defer.succeed(([], from_key)) - - def get_current_key(self, direction='f'): - return defer.succeed(0) - - def get_pagination_rows(self, user, pagination_config, key): - return defer.succeed(([], pagination_config.from_key)) - - class EventSources(object): SOURCE_TYPES = { "room": RoomEventSource, diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 2ee3da0b34..29d9bbaad4 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -41,6 +41,22 @@ myid = "@apple:test" PATH_PREFIX = "/_matrix/client/api/v1" +class NullSource(object): + """This event source never yields any events and its token remains at + zero. It may be useful for unit-testing.""" + def __init__(self, hs): + pass + + def get_new_events_for_user(self, user, from_key, limit): + return defer.succeed(([], from_key)) + + def get_current_key(self, direction='f'): + return defer.succeed(0) + + def get_pagination_rows(self, user, pagination_config, key): + return defer.succeed(([], pagination_config.from_key)) + + class JustPresenceHandlers(object): def __init__(self, hs): self.presence_handler = PresenceHandler(hs) @@ -243,7 +259,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): # HIDEOUS HACKERY # TODO(paul): This should be injected in via the HomeServer DI system from synapse.streams.events import ( - PresenceEventSource, NullSource, EventSources + PresenceEventSource, EventSources ) old_SOURCE_TYPES = EventSources.SOURCE_TYPES -- cgit 1.5.1 From f40b0ed5e190a78ed6633505c4f437b6fddc41ee Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Oct 2015 15:20:52 +0000 Subject: Inform the client of new room tags using v1 /events --- synapse/handlers/private_user_data.py | 46 +++++++++++++++++++++++++++++++++++ synapse/notifier.py | 2 +- synapse/rest/client/v2_alpha/tags.py | 14 ++++++++--- synapse/storage/tags.py | 16 +++++++++++- synapse/streams/events.py | 5 ++++ synapse/types.py | 22 ++++++++++------- 6 files changed, 91 insertions(+), 14 deletions(-) create mode 100644 synapse/handlers/private_user_data.py (limited to 'synapse/streams') diff --git a/synapse/handlers/private_user_data.py b/synapse/handlers/private_user_data.py new file mode 100644 index 0000000000..1778c71325 --- /dev/null +++ b/synapse/handlers/private_user_data.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + + +class PrivateUserDataEventSource(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + def get_current_key(self, direction='f'): + return self.store.get_max_private_user_data_stream_id() + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_key, limit): + user_id = user.to_string() + last_stream_id = from_key + + current_stream_id = yield self.store.get_max_private_user_data_stream_id() + tags = yield self.store.get_updated_tags(user_id, last_stream_id) + + results = [] + for room_id, room_tags in tags.items(): + results.append({ + "type": "m.tag", + "content": {"tags": room_tags}, + "room_id": room_id, + }) + + defer.returnValue((results, current_stream_id)) + + @defer.inlineCallbacks + def get_pagination_rows(self, user, config, key): + defer.returnValue(([], config.to_id)) diff --git a/synapse/notifier.py b/synapse/notifier.py index f998fc83bf..a78ee3c1e7 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -270,7 +270,7 @@ class Notifier(object): @defer.inlineCallbacks def wait_for_events(self, user, rooms, timeout, callback, - from_token=StreamToken("s0", "0", "0", "0")): + from_token=StreamToken("s0", "0", "0", "0", "0")): """Wait until the callback returns a non empty response or the timeout fires. """ diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index 15c9347fc1..486add9909 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -62,6 +62,7 @@ class TagServlet(RestServlet): super(TagServlet, self).__init__() self.auth = hs.get_auth() self.store = hs.get_datastore() + self.notifier = hs.get_notifier() @defer.inlineCallbacks def on_PUT(self, request, user_id, room_id, tag): @@ -69,9 +70,12 @@ class TagServlet(RestServlet): if user_id != auth_user.to_string(): raise AuthError(403, "Cannot add tags for other users.") - yield self.store.add_tag_to_room(user_id, room_id, tag) + max_id = yield self.store.add_tag_to_room(user_id, room_id, tag) + + yield self.notifier.on_new_event( + "private_user_data_key", max_id, users=[user_id] + ) - # TODO: poke the notifier. defer.returnValue((200, {})) @defer.inlineCallbacks @@ -80,7 +84,11 @@ class TagServlet(RestServlet): if user_id != auth_user.to_string(): raise AuthError(403, "Cannot add tags for other users.") - yield self.store.remove_tag_from_room(user_id, room_id, tag) + max_id = yield self.store.remove_tag_from_room(user_id, room_id, tag) + + yield self.notifier.on_new_event( + "private_user_data_key", max_id, users=[user_id] + ) # TODO: poke the notifier. defer.returnValue((200, {})) diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 64c65fc321..2d5c49144a 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -31,6 +31,14 @@ class TagsStore(SQLBaseStore): "private_user_data_max_stream_id", "stream_id" ) + def get_max_private_user_data_stream_id(self): + """Get the current max stream id for the private user data stream + + Returns: + A deferred int. + """ + return self._private_user_data_id_gen.get_max_token(self) + @cached() def get_tags_for_user(self, user_id): """Get all the tags for a user. @@ -83,7 +91,7 @@ class TagsStore(SQLBaseStore): results = {} if room_ids: - tags_by_room = yield self.get_tags_for_user(self, user_id) + tags_by_room = yield self.get_tags_for_user(user_id) for room_id in room_ids: results[room_id] = tags_by_room[room_id] @@ -129,6 +137,9 @@ class TagsStore(SQLBaseStore): self.get_tags_for_user.invalidate((user_id,)) + result = yield self._private_user_data_id_gen.get_max_token(self) + defer.returnValue(result) + @defer.inlineCallbacks def remove_tag_from_room(self, user_id, room_id, tag): """Remove a tag from a room for a user. @@ -148,6 +159,9 @@ class TagsStore(SQLBaseStore): self.get_tags_for_user.invalidate((user_id,)) + result = yield self._private_user_data_id_gen.get_max_token(self) + defer.returnValue(result) + def _update_revision_txn(self, txn, user_id, room_id, next_id): """Update the latest revision of the tags for the given user and room. diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 699083ae12..f0d68b5bf2 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -21,6 +21,7 @@ from synapse.handlers.presence import PresenceEventSource from synapse.handlers.room import RoomEventSource from synapse.handlers.typing import TypingNotificationEventSource from synapse.handlers.receipts import ReceiptEventSource +from synapse.handlers.private_user_data import PrivateUserDataEventSource class EventSources(object): @@ -29,6 +30,7 @@ class EventSources(object): "presence": PresenceEventSource, "typing": TypingNotificationEventSource, "receipt": ReceiptEventSource, + "private_user_data": PrivateUserDataEventSource, } def __init__(self, hs): @@ -52,5 +54,8 @@ class EventSources(object): receipt_key=( yield self.sources["receipt"].get_current_key() ), + private_user_data_key=( + yield self.sources["private_user_data"].get_current_key() + ), ) defer.returnValue(token) diff --git a/synapse/types.py b/synapse/types.py index 9cffc33d27..8d3a8d88cc 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -98,10 +98,13 @@ class EventID(DomainSpecificString): class StreamToken( - namedtuple( - "Token", - ("room_key", "presence_key", "typing_key", "receipt_key") - ) + namedtuple("Token", ( + "room_key", + "presence_key", + "typing_key", + "receipt_key", + "private_user_data_key", + )) ): _SEPARATOR = "_" @@ -128,13 +131,14 @@ class StreamToken( else: return int(self.room_key[1:].split("-")[-1]) - def is_after(self, other_token): + def is_after(self, other): """Does this token contain events that the other doesn't?""" return ( - (other_token.room_stream_id < self.room_stream_id) - or (int(other_token.presence_key) < int(self.presence_key)) - or (int(other_token.typing_key) < int(self.typing_key)) - or (int(other_token.receipt_key) < int(self.receipt_key)) + (other.room_stream_id < self.room_stream_id) + or (int(other.presence_key) < int(self.presence_key)) + or (int(other.typing_key) < int(self.typing_key)) + or (int(other.receipt_key) < int(self.receipt_key)) + or (int(other.private_user_data_key) < int(self.private_user_data_key)) ) def copy_and_advance(self, key, new_value): -- cgit 1.5.1