summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-07-10 14:53:19 +0100
committerErik Johnston <erik@matrix.org>2017-07-20 16:36:42 +0100
commitc544188ee3644c85a97a3c4e09e63ad4e3c6f0cc (patch)
tree6e8f40f857d42827b2fef0c1f6e150480ff870f3 /synapse/storage
parentMerge pull request #2374 from matrix-org/erikj/group_server_local (diff)
downloadsynapse-c544188ee3644c85a97a3c4e09e63ad4e3c6f0cc.tar.xz
Add groups to sync stream
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py15
-rw-r--r--synapse/storage/group_server.py68
-rw-r--r--synapse/storage/schema/delta/43/group_server.sql9
3 files changed, 87 insertions, 5 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index fdee9f1ad5..594566eb38 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -136,6 +136,9 @@ class DataStore(RoomMemberStore, RoomStore,
             db_conn, "pushers", "id",
             extra_tables=[("deleted_pushers", "stream_id")],
         )
+        self._group_updates_id_gen = StreamIdGenerator(
+            db_conn, "local_group_updates", "stream_id",
+        )
 
         if isinstance(self.database_engine, PostgresEngine):
             self._cache_id_gen = StreamIdGenerator(
@@ -236,6 +239,18 @@ class DataStore(RoomMemberStore, RoomStore,
             prefilled_cache=curr_state_delta_prefill,
         )
 
+        _group_updates_prefill, min_group_updates_id = self._get_cache_dict(
+            db_conn, "local_group_updates",
+            entity_column="user_id",
+            stream_column="stream_id",
+            max_value=self._group_updates_id_gen.get_current_token(),
+            limit=1000,
+        )
+        self._group_updates_stream_cache = StreamChangeCache(
+            "_group_updates_stream_cache", min_group_updates_id,
+            prefilled_cache=_group_updates_prefill,
+        )
+
         cur = LoggingTransaction(
             db_conn.cursor(),
             name="_find_stream_orderings_for_times_txn",
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index a2e7aa47d8..45f0a4c599 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -776,7 +776,7 @@ class GroupServerStore(SQLBaseStore):
             remote_attestation (dict): If remote group then store the remote
                 attestation from the group, else None.
         """
-        def _register_user_group_membership_txn(txn):
+        def _register_user_group_membership_txn(txn, next_id):
             # TODO: Upsert?
             self._simple_delete_txn(
                 txn,
@@ -798,6 +798,19 @@ class GroupServerStore(SQLBaseStore):
                 },
             )
 
+            self._simple_insert_txn(
+                txn,
+                table="local_group_updates",
+                values={
+                    "stream_id": next_id,
+                    "group_id": group_id,
+                    "user_id": user_id,
+                    "type": "membership",
+                    "content": json.dumps({"membership": membership, "content": content}),
+                }
+            )
+            self._group_updates_stream_cache.entity_has_changed(user_id, next_id)
+
             # TODO: Insert profile to ensure it comes down stream if its a join.
 
             if membership == "join":
@@ -840,10 +853,11 @@ class GroupServerStore(SQLBaseStore):
                     },
                 )
 
-        yield self.runInteraction(
-            "register_user_group_membership",
-            _register_user_group_membership_txn,
-        )
+        with self._group_updates_id_gen.get_next() as next_id:
+            yield self.runInteraction(
+                "register_user_group_membership",
+                _register_user_group_membership_txn, next_id,
+            )
 
     @defer.inlineCallbacks
     def create_group(self, group_id, user_id, name, avatar_url, short_description,
@@ -937,3 +951,47 @@ class GroupServerStore(SQLBaseStore):
             retcol="group_id",
             desc="get_joined_groups",
         )
+
+    def get_all_groups_for_user(self, user_id, now_token):
+        def _get_all_groups_for_user_txn(txn):
+            sql = """
+                SELECT group_id, type, membership, u.content
+                FROM local_group_updates AS u
+                INNER JOIN local_group_membership USING (group_id, user_id)
+                WHERE user_id = ? AND membership != 'leave'
+                    AND stream_id <= ?
+            """
+            txn.execute(sql, (user_id, now_token,))
+            return self.cursor_to_dict(txn)
+        return self.runInteraction(
+            "get_all_groups_for_user", _get_all_groups_for_user_txn,
+        )
+
+    def get_groups_changes_for_user(self, user_id, from_token, to_token):
+        from_token = int(from_token)
+        has_changed = self._group_updates_stream_cache.has_entity_changed(
+            user_id, from_token,
+        )
+        if not has_changed:
+            return []
+
+        def _get_groups_changes_for_user_txn(txn):
+            sql = """
+                SELECT group_id, membership, type, u.content
+                FROM local_group_updates AS u
+                INNER JOIN local_group_membership USING (group_id, user_id)
+                WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
+            """
+            txn.execute(sql, (user_id, from_token, to_token,))
+            return [{
+                "group_id": group_id,
+                "membership": membership,
+                "type": gtype,
+                "content": json.loads(content_json),
+            } for group_id, membership, gtype, content_json in txn]
+        return self.runInteraction(
+            "get_groups_changes_for_user", _get_groups_changes_for_user_txn,
+        )
+
+    def get_group_stream_token(self):
+        return self._group_updates_id_gen.get_current_token()
diff --git a/synapse/storage/schema/delta/43/group_server.sql b/synapse/storage/schema/delta/43/group_server.sql
index e1fd47aa7f..92f3339c94 100644
--- a/synapse/storage/schema/delta/43/group_server.sql
+++ b/synapse/storage/schema/delta/43/group_server.sql
@@ -155,3 +155,12 @@ CREATE TABLE local_group_membership (
 
 CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id);
 CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id);
+
+
+CREATE TABLE local_group_updates (
+    stream_id BIGINT NOT NULL,
+    group_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    content TEXT NOT NULL
+);