diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py
index 5831ff0e62..a108132346 100644
--- a/synapse/rest/client/v2_alpha/__init__.py
+++ b/synapse/rest/client/v2_alpha/__init__.py
@@ -22,6 +22,7 @@ from . import (
receipts,
keys,
tokenrefresh,
+ tags,
)
from synapse.http.server import JsonResource
@@ -44,3 +45,4 @@ class ClientV2AlphaRestResource(JsonResource):
receipts.register_servlets(hs, client_resource)
keys.register_servlets(hs, client_resource)
tokenrefresh.register_servlets(hs, client_resource)
+ tags.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py
new file mode 100644
index 0000000000..c4a670c5a7
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/tags.py
@@ -0,0 +1,89 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import client_v2_pattern
+
+from synapse.http.servlet import RestServlet
+
+from twisted.internet import defer
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class TagListServlet(RestServlet):
+ """
+ GET /user/{user_id}/rooms/{room_id}/tags HTTP/1.1
+ """
+ PATTERN = client_v2_pattern(
+ "/user/(?P<user_id>[^/]*)/rooms/(?P<room_id>[^/]*)/tags"
+ )
+
+ def __init__(self, hs):
+ super(TagListServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, user_id, room_id):
+ auth_user, _ = yield self.auth.get_user_by_req(request)
+ if user_id != auth_user.to_string():
+ raise AuthError(403, "Cannot get tags for other users.")
+
+ tags = yield self.store.get_tags_for_room(user_id, room_id)
+
+ defer.returnValue((200, {"tags": tags}))
+
+
+class TagServlet(RestServlet):
+ """
+ PUT /user/{user_id}/rooms/{room_id}/tags/{tag} HTTP/1.1
+ DELETE /user/{user_id}/rooms/{room_id}/tags/{tag} HTTP/1.1
+ """
+ PATTERN = client_v2_pattern(
+ "/user/(?P<user_id>[^/]*)/rooms/(?P<room_id>[^/]*)/tags/(?P<tag>[^/]*)"
+ )
+ def __init__(self, hs):
+ super(TagServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, user_id, room_id, tag):
+ auth_user, _ = yield self.auth.get_user_by_req(request)
+ if user_id != auth_user.to_string():
+ raise AuthError(403, "Cannot add tags for other users.")
+
+ yield self.store.add_tag_to_room(user_id, room_id, tag)
+
+ # TODO: poke the notifier.
+ defer.returnValue((200, {}))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, request, user_id, room_id, tag):
+ auth_user, _ = yield self.auth.get_user_by_req(request)
+ if user_id != auth_user.to_string():
+ raise AuthError(403, "Cannot add tags for other users.")
+
+ yield self.store.remove_tag_from_room(user_id, room_id, tag)
+
+ # TODO: poke the notifier.
+ defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+ TagListServlet(hs).register(http_server)
+ TagServlet(hs).register(http_server)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index a1bd9c4ce9..e7443f2838 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -41,6 +41,7 @@ from .end_to_end_keys import EndToEndKeyStore
from .receipts import ReceiptsStore
from .search import SearchStore
+from .tags import TagsStore
import logging
@@ -71,6 +72,7 @@ class DataStore(RoomMemberStore, RoomStore,
ReceiptsStore,
EndToEndKeyStore,
SearchStore,
+ TagsStore,
):
def __init__(self, hs):
diff --git a/synapse/storage/schema/delta/25/tags.sql b/synapse/storage/schema/delta/25/tags.sql
new file mode 100644
index 0000000000..168766dcf3
--- /dev/null
+++ b/synapse/storage/schema/delta/25/tags.sql
@@ -0,0 +1,37 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE IF NOT EXISTS room_tags(
+ user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ tag TEXT NOT NULL, -- The name of the tag.
+ CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag)
+);
+
+CREATE TABLE IF NOT EXISTS room_tags_revisions (
+ user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ stream_id BIGINT NOT NULL, -- The current version of the room tags.
+ CONSTRAINT room_tag_revisions_uniqueness UNIQUE (user_id, room_id)
+);
+
+CREATE TABLE IF NOT EXISTS private_user_data_max_stream_id(
+ Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
+ stream_id BIGINT NOT NULL,
+ CHECK (Lock='X')
+);
+
+INSERT INTO private_user_data_max_stream_id (stream_id) VALUES (0);
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
new file mode 100644
index 0000000000..507e60596f
--- /dev/null
+++ b/synapse/storage/tags.py
@@ -0,0 +1,190 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore
+from synapse.util.caches.descriptors import cached
+from twisted.internet import defer
+from .util.id_generators import StreamIdGenerator
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class TagsStore(SQLBaseStore):
+ def __init__(self, hs):
+ super(TagsStore, self).__init__(hs)
+
+ self._private_user_data_id_gen = StreamIdGenerator(
+ "private_user_data_max_stream_id", "stream_id"
+ )
+
+ @cached()
+ def get_tags_for_user(self, user_id):
+ """Get all the tags for a user.
+
+
+ Args:
+ user_id(str): The user to get the tags for.
+ Returns:
+ A deferred dict mapping from room_id strings to lists of tag
+ strings.
+ """
+
+ deferred = self._simple_select_list(
+ "room_tags", {"user_id": user_id}, ["room_id", "tag"]
+ )
+
+ @deferred.addCallback
+ def tags_by_room(rows):
+ tags_by_room = {}
+ for row in rows:
+ tags_by_room.setdefault(row["room_id"], []).append(row["tag"])
+ return tags_by_room
+
+ return deferred
+
+ @defer.inlineCallbacks
+ def get_updated_tags(self, user_id, stream_id):
+ """Get all the tags for the rooms where the tags have changed since the
+ given version
+
+ Args:
+ user_id(str): The user to get the tags for.
+ stream_id(int): The earliest update to get for the user.
+ Returns:
+ A deferred dict mapping from room_id strings to lists of tag
+ strings for all the rooms that changed since the stream_id token.
+ """
+ def get_updated_tags_txn(txn):
+ sql = (
+ "SELECT room_id from room_tags_revisions"
+ " WHERE user_id = ? AND stream_id > ?"
+ )
+ txn.execute(sql, (user_id, stream_id))
+ room_ids = [row[0] for row in txn.fetchall()]
+ return room_ids
+
+ room_ids = yield self.runInteraction(
+ "get_updated_tags", get_updated_tags_txn
+ )
+
+ results = {}
+ if room_ids:
+ tags_by_room = yield self.get_tags_for_user(self, user_id)
+ for room_id in rooms_ids:
+ results[room_id] = tags_by_room[room_id]
+
+ defer.returnValue(results)
+
+ def get_tags_for_room(self, user_id, room_id):
+ """Get all the tags for the given room
+ Args:
+ user_id(str): The user to get tags for
+ room_id(str): The room to get tags for
+ Returns:
+ A deferred list of string tags.
+ """
+ return self._simple_select_onecol(
+ table="room_tags",
+ keyvalues={"user_id": user_id, "room_id": room_id},
+ retcol="tag",
+ desc="get_tags_for_room",
+ )
+
+ @defer.inlineCallbacks
+ def add_tag_to_room(self, user_id, room_id, tag):
+ """Add a tag to a room for a user.
+ Returns:
+ A deferred that completes once the tag has been added.
+ """
+ def add_tag_txn(txn, next_id):
+ sql = (
+ "INSERT INTO room_tags (user_id, room_id, tag)"
+ " VALUES (?, ?, ?)"
+ )
+ try:
+ txn.execute(sql, (user_id, room_id, tag))
+ except database_engine.module.IntegrityError as e:
+ # Return early if the row is already in the table
+ # and we don't need to bump the revision number of the
+ # private_user_data.
+ return
+ self._update_revision_txn(txn, user_id, room_id, next_id)
+
+ with (yield self._private_user_data_id_gen.get_next(self)) as next_id:
+ yield self.runInteraction("add_tag", add_tag_txn, next_id)
+
+ self.get_tags_for_user.invalidate((user_id,))
+
+ @defer.inlineCallbacks
+ def remove_tag_from_room(self, user_id, room_id, tag):
+ """Remove a tag from a room for a user.
+ Returns:
+ A deffered that completes once the tag has been removed
+ """
+ def remove_tag_txn(txn, next_id):
+ sql = (
+ "DELETE FROM room_tags "
+ " WHERE user_id = ? AND room_id = ? AND tag = ?"
+ )
+ txn.execute(sql, (user_id, room_id, tag))
+ self._update_revision_txn(txn, user_id, room_id, next_id)
+
+ with (yield self._private_user_data_id_gen.get_next(self)) as next_id:
+ yield self.runInteraction("remove_tag", remove_tag_txn, next_id)
+
+ self.get_tags_for_user.invalidate((user_id,))
+
+ def _update_revision_txn(self, txn, user_id, room_id, next_id):
+ """Update the latest revision of the tags for the given user and room.
+
+ Args:
+ txn: The database cursor
+ user_id(str): The ID of the user.
+ room_id(str): The ID of the room.
+ next_id(int): The the revision to advance to.
+ """
+
+ update_max_id_sql = (
+ "UPDATE private_user_data_max_stream_id"
+ " SET stream_id = ?"
+ " WHERE stream_id < ?"
+ )
+ txn.execute(update_max_id_sql, (next_id, next_id))
+
+ update_sql = (
+ "UPDATE room_tags_revisions"
+ " SET stream_id = ?"
+ " WHERE user_id = ?"
+ " AND room_id = ?"
+ )
+ txn.execute(update_sql, (next_id, user_id, room_id))
+
+ if txn.rowcount == 0:
+ insert_sql = (
+ "INSERT INTO room_tags_revisions (user_id, room_id, stream_id)"
+ " VALUES (?, ?, ?)"
+ )
+ try:
+ txn.execute(insert_sql, (user_id, room_id, next_id))
+ except database_engine.module.IntegrityError as e:
+ # Ignore insertion errors. It doesn't matter if the row wasn't
+ # inserted because if two updates happend concurrently the one
+ # with the higher stream_id will not be reported to a client
+ # unless the previous update has completed. It doesn't matter
+ # which stream_id ends up in the table, as long as it is higher
+ # than the id that the client has.
+ pass
|