diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 024474d5fe..8f156e5c84 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -322,6 +322,8 @@ class MessageHandler(BaseHandler):
user, pagination_config.get_source_config("receipt"), None
)
+ tags_by_room = yield self.store.get_tags_for_user(user_id)
+
public_room_ids = yield self.store.get_public_room_ids()
limit = pagin_config.limit
@@ -398,6 +400,16 @@ class MessageHandler(BaseHandler):
serialize_event(c, time_now, as_client_event)
for c in current_state.values()
]
+
+ private_user_data = []
+ tags = tags_by_room.get(event.room_id)
+ if tags:
+ private_user_data.append({
+ "room_id": event.room_id,
+ "type": "m.tag",
+ "content": {"tags": tags},
+ })
+ d["private_user_data"] = private_user_data
except:
logger.exception("Failed to get snapshot")
@@ -447,6 +459,17 @@ class MessageHandler(BaseHandler):
result = yield self._room_initial_sync_parted(
user_id, room_id, pagin_config, member_event
)
+
+ private_user_data = []
+ tags = yield self.store.get_tags_for_room(user_id, room_id)
+ if tags:
+ private_user_data.append({
+ "type": "m.tag",
+ "content": {"tags": tags},
+ "room_id": room_id,
+ })
+ result["private_user_data"] = private_user_data
+
defer.returnValue(result)
@defer.inlineCallbacks
diff --git a/synapse/handlers/private_user_data.py b/synapse/handlers/private_user_data.py
new file mode 100644
index 0000000000..1778c71325
--- /dev/null
+++ b/synapse/handlers/private_user_data.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+
+class PrivateUserDataEventSource(object):
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+
+ def get_current_key(self, direction='f'):
+ return self.store.get_max_private_user_data_stream_id()
+
+ @defer.inlineCallbacks
+ def get_new_events_for_user(self, user, from_key, limit):
+ user_id = user.to_string()
+ last_stream_id = from_key
+
+ current_stream_id = yield self.store.get_max_private_user_data_stream_id()
+ tags = yield self.store.get_updated_tags(user_id, last_stream_id)
+
+ results = []
+ for room_id, room_tags in tags.items():
+ results.append({
+ "type": "m.tag",
+ "content": {"tags": room_tags},
+ "room_id": room_id,
+ })
+
+ defer.returnValue((results, current_stream_id))
+
+ @defer.inlineCallbacks
+ def get_pagination_rows(self, user, config, key):
+ defer.returnValue(([], config.to_id))
diff --git a/synapse/notifier.py b/synapse/notifier.py
index f998fc83bf..a78ee3c1e7 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -270,7 +270,7 @@ class Notifier(object):
@defer.inlineCallbacks
def wait_for_events(self, user, rooms, timeout, callback,
- from_token=StreamToken("s0", "0", "0", "0")):
+ from_token=StreamToken("s0", "0", "0", "0", "0")):
"""Wait until the callback returns a non empty response or the
timeout fires.
"""
diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py
index 5831ff0e62..a108132346 100644
--- a/synapse/rest/client/v2_alpha/__init__.py
+++ b/synapse/rest/client/v2_alpha/__init__.py
@@ -22,6 +22,7 @@ from . import (
receipts,
keys,
tokenrefresh,
+ tags,
)
from synapse.http.server import JsonResource
@@ -44,3 +45,4 @@ class ClientV2AlphaRestResource(JsonResource):
receipts.register_servlets(hs, client_resource)
keys.register_servlets(hs, client_resource)
tokenrefresh.register_servlets(hs, client_resource)
+ tags.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py
new file mode 100644
index 0000000000..486add9909
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/tags.py
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import client_v2_pattern
+
+from synapse.http.servlet import RestServlet
+from synapse.api.errors import AuthError
+
+from twisted.internet import defer
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class TagListServlet(RestServlet):
+ """
+ GET /user/{user_id}/rooms/{room_id}/tags HTTP/1.1
+ """
+ PATTERN = client_v2_pattern(
+ "/user/(?P<user_id>[^/]*)/rooms/(?P<room_id>[^/]*)/tags"
+ )
+
+ def __init__(self, hs):
+ super(TagListServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, user_id, room_id):
+ auth_user, _ = yield self.auth.get_user_by_req(request)
+ if user_id != auth_user.to_string():
+ raise AuthError(403, "Cannot get tags for other users.")
+
+ tags = yield self.store.get_tags_for_room(user_id, room_id)
+
+ defer.returnValue((200, {"tags": tags}))
+
+
+class TagServlet(RestServlet):
+ """
+ PUT /user/{user_id}/rooms/{room_id}/tags/{tag} HTTP/1.1
+ DELETE /user/{user_id}/rooms/{room_id}/tags/{tag} HTTP/1.1
+ """
+ PATTERN = client_v2_pattern(
+ "/user/(?P<user_id>[^/]*)/rooms/(?P<room_id>[^/]*)/tags/(?P<tag>[^/]*)"
+ )
+
+ def __init__(self, hs):
+ super(TagServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+ self.notifier = hs.get_notifier()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, user_id, room_id, tag):
+ auth_user, _ = yield self.auth.get_user_by_req(request)
+ if user_id != auth_user.to_string():
+ raise AuthError(403, "Cannot add tags for other users.")
+
+ max_id = yield self.store.add_tag_to_room(user_id, room_id, tag)
+
+ yield self.notifier.on_new_event(
+ "private_user_data_key", max_id, users=[user_id]
+ )
+
+ defer.returnValue((200, {}))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, request, user_id, room_id, tag):
+ auth_user, _ = yield self.auth.get_user_by_req(request)
+ if user_id != auth_user.to_string():
+ raise AuthError(403, "Cannot add tags for other users.")
+
+ max_id = yield self.store.remove_tag_from_room(user_id, room_id, tag)
+
+ yield self.notifier.on_new_event(
+ "private_user_data_key", max_id, users=[user_id]
+ )
+
+ # TODO: poke the notifier.
+ defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+ TagListServlet(hs).register(http_server)
+ TagServlet(hs).register(http_server)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index a1bd9c4ce9..e7443f2838 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -41,6 +41,7 @@ from .end_to_end_keys import EndToEndKeyStore
from .receipts import ReceiptsStore
from .search import SearchStore
+from .tags import TagsStore
import logging
@@ -71,6 +72,7 @@ class DataStore(RoomMemberStore, RoomStore,
ReceiptsStore,
EndToEndKeyStore,
SearchStore,
+ TagsStore,
):
def __init__(self, hs):
diff --git a/synapse/storage/schema/delta/25/tags.sql b/synapse/storage/schema/delta/25/tags.sql
new file mode 100644
index 0000000000..168766dcf3
--- /dev/null
+++ b/synapse/storage/schema/delta/25/tags.sql
@@ -0,0 +1,37 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE IF NOT EXISTS room_tags(
+ user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ tag TEXT NOT NULL, -- The name of the tag.
+ CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag)
+);
+
+CREATE TABLE IF NOT EXISTS room_tags_revisions (
+ user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ stream_id BIGINT NOT NULL, -- The current version of the room tags.
+ CONSTRAINT room_tag_revisions_uniqueness UNIQUE (user_id, room_id)
+);
+
+CREATE TABLE IF NOT EXISTS private_user_data_max_stream_id(
+ Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
+ stream_id BIGINT NOT NULL,
+ CHECK (Lock='X')
+);
+
+INSERT INTO private_user_data_max_stream_id (stream_id) VALUES (0);
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
new file mode 100644
index 0000000000..2d5c49144a
--- /dev/null
+++ b/synapse/storage/tags.py
@@ -0,0 +1,204 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore
+from synapse.util.caches.descriptors import cached
+from twisted.internet import defer
+from .util.id_generators import StreamIdGenerator
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class TagsStore(SQLBaseStore):
+ def __init__(self, hs):
+ super(TagsStore, self).__init__(hs)
+
+ self._private_user_data_id_gen = StreamIdGenerator(
+ "private_user_data_max_stream_id", "stream_id"
+ )
+
+ def get_max_private_user_data_stream_id(self):
+ """Get the current max stream id for the private user data stream
+
+ Returns:
+ A deferred int.
+ """
+ return self._private_user_data_id_gen.get_max_token(self)
+
+ @cached()
+ def get_tags_for_user(self, user_id):
+ """Get all the tags for a user.
+
+
+ Args:
+ user_id(str): The user to get the tags for.
+ Returns:
+ A deferred dict mapping from room_id strings to lists of tag
+ strings.
+ """
+
+ deferred = self._simple_select_list(
+ "room_tags", {"user_id": user_id}, ["room_id", "tag"]
+ )
+
+ @deferred.addCallback
+ def tags_by_room(rows):
+ tags_by_room = {}
+ for row in rows:
+ tags_by_room.setdefault(row["room_id"], []).append(row["tag"])
+ return tags_by_room
+
+ return deferred
+
+ @defer.inlineCallbacks
+ def get_updated_tags(self, user_id, stream_id):
+ """Get all the tags for the rooms where the tags have changed since the
+ given version
+
+ Args:
+ user_id(str): The user to get the tags for.
+ stream_id(int): The earliest update to get for the user.
+ Returns:
+ A deferred dict mapping from room_id strings to lists of tag
+ strings for all the rooms that changed since the stream_id token.
+ """
+ def get_updated_tags_txn(txn):
+ sql = (
+ "SELECT room_id from room_tags_revisions"
+ " WHERE user_id = ? AND stream_id > ?"
+ )
+ txn.execute(sql, (user_id, stream_id))
+ room_ids = [row[0] for row in txn.fetchall()]
+ return room_ids
+
+ room_ids = yield self.runInteraction(
+ "get_updated_tags", get_updated_tags_txn
+ )
+
+ results = {}
+ if room_ids:
+ tags_by_room = yield self.get_tags_for_user(user_id)
+ for room_id in room_ids:
+ results[room_id] = tags_by_room[room_id]
+
+ defer.returnValue(results)
+
+ def get_tags_for_room(self, user_id, room_id):
+ """Get all the tags for the given room
+ Args:
+ user_id(str): The user to get tags for
+ room_id(str): The room to get tags for
+ Returns:
+ A deferred list of string tags.
+ """
+ return self._simple_select_onecol(
+ table="room_tags",
+ keyvalues={"user_id": user_id, "room_id": room_id},
+ retcol="tag",
+ desc="get_tags_for_room",
+ )
+
+ @defer.inlineCallbacks
+ def add_tag_to_room(self, user_id, room_id, tag):
+ """Add a tag to a room for a user.
+ Returns:
+ A deferred that completes once the tag has been added.
+ """
+ def add_tag_txn(txn, next_id):
+ sql = (
+ "INSERT INTO room_tags (user_id, room_id, tag)"
+ " VALUES (?, ?, ?)"
+ )
+ try:
+ txn.execute(sql, (user_id, room_id, tag))
+ except self.database_engine.module.IntegrityError:
+ # Return early if the row is already in the table
+ # and we don't need to bump the revision number of the
+ # private_user_data.
+ return
+ self._update_revision_txn(txn, user_id, room_id, next_id)
+
+ with (yield self._private_user_data_id_gen.get_next(self)) as next_id:
+ yield self.runInteraction("add_tag", add_tag_txn, next_id)
+
+ self.get_tags_for_user.invalidate((user_id,))
+
+ result = yield self._private_user_data_id_gen.get_max_token(self)
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def remove_tag_from_room(self, user_id, room_id, tag):
+ """Remove a tag from a room for a user.
+ Returns:
+ A deffered that completes once the tag has been removed
+ """
+ def remove_tag_txn(txn, next_id):
+ sql = (
+ "DELETE FROM room_tags "
+ " WHERE user_id = ? AND room_id = ? AND tag = ?"
+ )
+ txn.execute(sql, (user_id, room_id, tag))
+ self._update_revision_txn(txn, user_id, room_id, next_id)
+
+ with (yield self._private_user_data_id_gen.get_next(self)) as next_id:
+ yield self.runInteraction("remove_tag", remove_tag_txn, next_id)
+
+ self.get_tags_for_user.invalidate((user_id,))
+
+ result = yield self._private_user_data_id_gen.get_max_token(self)
+ defer.returnValue(result)
+
+ def _update_revision_txn(self, txn, user_id, room_id, next_id):
+ """Update the latest revision of the tags for the given user and room.
+
+ Args:
+ txn: The database cursor
+ user_id(str): The ID of the user.
+ room_id(str): The ID of the room.
+ next_id(int): The the revision to advance to.
+ """
+
+ update_max_id_sql = (
+ "UPDATE private_user_data_max_stream_id"
+ " SET stream_id = ?"
+ " WHERE stream_id < ?"
+ )
+ txn.execute(update_max_id_sql, (next_id, next_id))
+
+ update_sql = (
+ "UPDATE room_tags_revisions"
+ " SET stream_id = ?"
+ " WHERE user_id = ?"
+ " AND room_id = ?"
+ )
+ txn.execute(update_sql, (next_id, user_id, room_id))
+
+ if txn.rowcount == 0:
+ insert_sql = (
+ "INSERT INTO room_tags_revisions (user_id, room_id, stream_id)"
+ " VALUES (?, ?, ?)"
+ )
+ try:
+ txn.execute(insert_sql, (user_id, room_id, next_id))
+ except self.database_engine.module.IntegrityError:
+ # Ignore insertion errors. It doesn't matter if the row wasn't
+ # inserted because if two updates happend concurrently the one
+ # with the higher stream_id will not be reported to a client
+ # unless the previous update has completed. It doesn't matter
+ # which stream_id ends up in the table, as long as it is higher
+ # than the id that the client has.
+ pass
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 699083ae12..f0d68b5bf2 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -21,6 +21,7 @@ from synapse.handlers.presence import PresenceEventSource
from synapse.handlers.room import RoomEventSource
from synapse.handlers.typing import TypingNotificationEventSource
from synapse.handlers.receipts import ReceiptEventSource
+from synapse.handlers.private_user_data import PrivateUserDataEventSource
class EventSources(object):
@@ -29,6 +30,7 @@ class EventSources(object):
"presence": PresenceEventSource,
"typing": TypingNotificationEventSource,
"receipt": ReceiptEventSource,
+ "private_user_data": PrivateUserDataEventSource,
}
def __init__(self, hs):
@@ -52,5 +54,8 @@ class EventSources(object):
receipt_key=(
yield self.sources["receipt"].get_current_key()
),
+ private_user_data_key=(
+ yield self.sources["private_user_data"].get_current_key()
+ ),
)
defer.returnValue(token)
diff --git a/synapse/types.py b/synapse/types.py
index 8c51e00e8a..28344d8b36 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -98,10 +98,13 @@ class EventID(DomainSpecificString):
class StreamToken(
- namedtuple(
- "Token",
- ("room_key", "presence_key", "typing_key", "receipt_key")
- )
+ namedtuple("Token", (
+ "room_key",
+ "presence_key",
+ "typing_key",
+ "receipt_key",
+ "private_user_data_key",
+ ))
):
_SEPARATOR = "_"
@@ -109,7 +112,7 @@ class StreamToken(
def from_string(cls, string):
try:
keys = string.split(cls._SEPARATOR)
- if len(keys) == len(cls._fields) - 1:
+ while len(keys) < len(cls._fields):
# i.e. old token from before receipt_key
keys.append("0")
return cls(*keys)
@@ -128,13 +131,14 @@ class StreamToken(
else:
return int(self.room_key[1:].split("-")[-1])
- def is_after(self, other_token):
+ def is_after(self, other):
"""Does this token contain events that the other doesn't?"""
return (
- (other_token.room_stream_id < self.room_stream_id)
- or (int(other_token.presence_key) < int(self.presence_key))
- or (int(other_token.typing_key) < int(self.typing_key))
- or (int(other_token.receipt_key) < int(self.receipt_key))
+ (other.room_stream_id < self.room_stream_id)
+ or (int(other.presence_key) < int(self.presence_key))
+ or (int(other.typing_key) < int(self.typing_key))
+ or (int(other.receipt_key) < int(self.receipt_key))
+ or (int(other.private_user_data_key) < int(self.private_user_data_key))
)
def copy_and_advance(self, key, new_value):
diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py
index 29d9bbaad4..0e3b922246 100644
--- a/tests/rest/client/v1/test_presence.py
+++ b/tests/rest/client/v1/test_presence.py
@@ -369,7 +369,7 @@ class PresenceEventStreamTestCase(unittest.TestCase):
# all be ours
# I'll already get my own presence state change
- self.assertEquals({"start": "0_1_0_0", "end": "0_1_0_0", "chunk": []},
+ self.assertEquals({"start": "0_1_0_0_0", "end": "0_1_0_0_0", "chunk": []},
response
)
@@ -388,7 +388,7 @@ class PresenceEventStreamTestCase(unittest.TestCase):
"/events?from=s0_1_0&timeout=0", None)
self.assertEquals(200, code)
- self.assertEquals({"start": "s0_1_0_0", "end": "s0_2_0_0", "chunk": [
+ self.assertEquals({"start": "s0_1_0_0_0", "end": "s0_2_0_0_0", "chunk": [
{"type": "m.presence",
"content": {
"user_id": "@banana:test",
|