summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/private_user_data.py46
-rw-r--r--synapse/notifier.py2
-rw-r--r--synapse/rest/client/v2_alpha/tags.py14
-rw-r--r--synapse/storage/tags.py16
-rw-r--r--synapse/streams/events.py5
-rw-r--r--synapse/types.py22
6 files changed, 91 insertions, 14 deletions
diff --git a/synapse/handlers/private_user_data.py b/synapse/handlers/private_user_data.py
new file mode 100644
index 0000000000..1778c71325
--- /dev/null
+++ b/synapse/handlers/private_user_data.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+
+class PrivateUserDataEventSource(object):
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+
+    def get_current_key(self, direction='f'):
+        return self.store.get_max_private_user_data_stream_id()
+
+    @defer.inlineCallbacks
+    def get_new_events_for_user(self, user, from_key, limit):
+        user_id = user.to_string()
+        last_stream_id = from_key
+
+        current_stream_id = yield self.store.get_max_private_user_data_stream_id()
+        tags = yield self.store.get_updated_tags(user_id, last_stream_id)
+
+        results = []
+        for room_id, room_tags in tags.items():
+            results.append({
+                "type": "m.tag",
+                "content": {"tags": room_tags},
+                "room_id": room_id,
+            })
+
+        defer.returnValue((results, current_stream_id))
+
+    @defer.inlineCallbacks
+    def get_pagination_rows(self, user, config, key):
+        defer.returnValue(([], config.to_id))
diff --git a/synapse/notifier.py b/synapse/notifier.py
index f998fc83bf..a78ee3c1e7 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -270,7 +270,7 @@ class Notifier(object):
 
     @defer.inlineCallbacks
     def wait_for_events(self, user, rooms, timeout, callback,
-                        from_token=StreamToken("s0", "0", "0", "0")):
+                        from_token=StreamToken("s0", "0", "0", "0", "0")):
         """Wait until the callback returns a non empty response or the
         timeout fires.
         """
diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py
index 15c9347fc1..486add9909 100644
--- a/synapse/rest/client/v2_alpha/tags.py
+++ b/synapse/rest/client/v2_alpha/tags.py
@@ -62,6 +62,7 @@ class TagServlet(RestServlet):
         super(TagServlet, self).__init__()
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
+        self.notifier = hs.get_notifier()
 
     @defer.inlineCallbacks
     def on_PUT(self, request, user_id, room_id, tag):
@@ -69,9 +70,12 @@ class TagServlet(RestServlet):
         if user_id != auth_user.to_string():
             raise AuthError(403, "Cannot add tags for other users.")
 
-        yield self.store.add_tag_to_room(user_id, room_id, tag)
+        max_id = yield self.store.add_tag_to_room(user_id, room_id, tag)
+
+        yield self.notifier.on_new_event(
+            "private_user_data_key", max_id, users=[user_id]
+        )
 
-        # TODO: poke the notifier.
         defer.returnValue((200, {}))
 
     @defer.inlineCallbacks
@@ -80,7 +84,11 @@ class TagServlet(RestServlet):
         if user_id != auth_user.to_string():
             raise AuthError(403, "Cannot add tags for other users.")
 
-        yield self.store.remove_tag_from_room(user_id, room_id, tag)
+        max_id = yield self.store.remove_tag_from_room(user_id, room_id, tag)
+
+        yield self.notifier.on_new_event(
+            "private_user_data_key", max_id, users=[user_id]
+        )
 
         # TODO: poke the notifier.
         defer.returnValue((200, {}))
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 64c65fc321..2d5c49144a 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -31,6 +31,14 @@ class TagsStore(SQLBaseStore):
             "private_user_data_max_stream_id", "stream_id"
         )
 
+    def get_max_private_user_data_stream_id(self):
+        """Get the current max stream id for the private user data stream
+
+        Returns:
+            A deferred int.
+        """
+        return self._private_user_data_id_gen.get_max_token(self)
+
     @cached()
     def get_tags_for_user(self, user_id):
         """Get all the tags for a user.
@@ -83,7 +91,7 @@ class TagsStore(SQLBaseStore):
 
         results = {}
         if room_ids:
-            tags_by_room = yield self.get_tags_for_user(self, user_id)
+            tags_by_room = yield self.get_tags_for_user(user_id)
             for room_id in room_ids:
                 results[room_id] = tags_by_room[room_id]
 
@@ -129,6 +137,9 @@ class TagsStore(SQLBaseStore):
 
         self.get_tags_for_user.invalidate((user_id,))
 
+        result = yield self._private_user_data_id_gen.get_max_token(self)
+        defer.returnValue(result)
+
     @defer.inlineCallbacks
     def remove_tag_from_room(self, user_id, room_id, tag):
         """Remove a tag from a room for a user.
@@ -148,6 +159,9 @@ class TagsStore(SQLBaseStore):
 
         self.get_tags_for_user.invalidate((user_id,))
 
+        result = yield self._private_user_data_id_gen.get_max_token(self)
+        defer.returnValue(result)
+
     def _update_revision_txn(self, txn, user_id, room_id, next_id):
         """Update the latest revision of the tags for the given user and room.
 
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 699083ae12..f0d68b5bf2 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -21,6 +21,7 @@ from synapse.handlers.presence import PresenceEventSource
 from synapse.handlers.room import RoomEventSource
 from synapse.handlers.typing import TypingNotificationEventSource
 from synapse.handlers.receipts import ReceiptEventSource
+from synapse.handlers.private_user_data import PrivateUserDataEventSource
 
 
 class EventSources(object):
@@ -29,6 +30,7 @@ class EventSources(object):
         "presence": PresenceEventSource,
         "typing": TypingNotificationEventSource,
         "receipt": ReceiptEventSource,
+        "private_user_data": PrivateUserDataEventSource,
     }
 
     def __init__(self, hs):
@@ -52,5 +54,8 @@ class EventSources(object):
             receipt_key=(
                 yield self.sources["receipt"].get_current_key()
             ),
+            private_user_data_key=(
+                yield self.sources["private_user_data"].get_current_key()
+            ),
         )
         defer.returnValue(token)
diff --git a/synapse/types.py b/synapse/types.py
index 9cffc33d27..8d3a8d88cc 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -98,10 +98,13 @@ class EventID(DomainSpecificString):
 
 
 class StreamToken(
-    namedtuple(
-        "Token",
-        ("room_key", "presence_key", "typing_key", "receipt_key")
-    )
+    namedtuple("Token", (
+        "room_key",
+        "presence_key",
+        "typing_key",
+        "receipt_key",
+        "private_user_data_key",
+    ))
 ):
     _SEPARATOR = "_"
 
@@ -128,13 +131,14 @@ class StreamToken(
         else:
             return int(self.room_key[1:].split("-")[-1])
 
-    def is_after(self, other_token):
+    def is_after(self, other):
         """Does this token contain events that the other doesn't?"""
         return (
-            (other_token.room_stream_id < self.room_stream_id)
-            or (int(other_token.presence_key) < int(self.presence_key))
-            or (int(other_token.typing_key) < int(self.typing_key))
-            or (int(other_token.receipt_key) < int(self.receipt_key))
+            (other.room_stream_id < self.room_stream_id)
+            or (int(other.presence_key) < int(self.presence_key))
+            or (int(other.typing_key) < int(self.typing_key))
+            or (int(other.receipt_key) < int(self.receipt_key))
+            or (int(other.private_user_data_key) < int(self.private_user_data_key))
         )
 
     def copy_and_advance(self, key, new_value):