diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 4bdd99a966..d06a05acd9 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -41,6 +41,7 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -75,6 +76,7 @@ class SynchrotronSlavedStore(
SlavedRegistrationStore,
SlavedFilteringStore,
SlavedPresenceStore,
+ SlavedGroupServerStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedClientIpStore,
@@ -409,6 +411,10 @@ class SyncReplicationHandler(ReplicationClientHandler):
)
elif stream_name == "presence":
yield self.presence_handler.process_replication_rows(token, rows)
+ elif stream_name == "receipts":
+ self.notifier.on_new_event(
+ "groups_key", token, users=[row.user_id for row in rows],
+ )
def start(config_options):
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 0b80348c82..4182ea5afa 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -211,13 +211,16 @@ class GroupsLocalHandler(object):
user_id=user_id,
)
- yield self.store.register_user_group_membership(
+ token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="join",
is_admin=False,
local_attestation=local_attestation,
remote_attestation=remote_attestation,
)
+ self.notifier.on_new_event(
+ "groups_key", token, users=[user_id],
+ )
defer.returnValue({})
@@ -257,11 +260,14 @@ class GroupsLocalHandler(object):
if "avatar_url" in content["profile"]:
local_profile["avatar_url"] = content["profile"]["avatar_url"]
- yield self.store.register_user_group_membership(
+ token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="invite",
content={"profile": local_profile, "inviter": content["inviter"]},
)
+ self.notifier.on_new_event(
+ "groups_key", token, users=[user_id],
+ )
defer.returnValue({"state": "invite"})
@@ -270,10 +276,13 @@ class GroupsLocalHandler(object):
"""Remove a user from a group
"""
if user_id == requester_user_id:
- yield self.store.register_user_group_membership(
+ token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="leave",
)
+ self.notifier.on_new_event(
+ "groups_key", token, users=[user_id],
+ )
# TODO: Should probably remember that we tried to leave so that we can
# retry if the group server is currently down.
@@ -296,10 +305,13 @@ class GroupsLocalHandler(object):
"""One of our users was removed/kicked from a group
"""
# TODO: Check if user in group
- yield self.store.register_user_group_membership(
+ token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="leave",
)
+ self.notifier.on_new_event(
+ "groups_key", token, users=[user_id],
+ )
@defer.inlineCallbacks
def get_joined_groups(self, user_id):
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
new file mode 100644
index 0000000000..0bc4bce5b0
--- /dev/null
+++ b/synapse/replication/slave/storage/groups.py
@@ -0,0 +1,54 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage import DataStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+
+class SlavedGroupServerStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedGroupServerStore, self).__init__(db_conn, hs)
+
+ self.hs = hs
+
+ self._group_updates_id_gen = SlavedIdTracker(
+ db_conn, "local_group_updates", "stream_id",
+ )
+ self._group_updates_stream_cache = StreamChangeCache(
+ "_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(),
+ )
+
+ get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__
+ get_group_stream_token = DataStore.get_group_stream_token.__func__
+ get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__
+
+ def stream_positions(self):
+ result = super(SlavedGroupServerStore, self).stream_positions()
+ result["groups"] = self._group_updates_id_gen.get_current_token()
+ return result
+
+ def process_replication_rows(self, stream_name, token, rows):
+ if stream_name == "groups":
+ self._group_updates_id_gen.advance(token)
+ for row in rows:
+ self._group_updates_stream_cache.entity_has_changed(
+ row.user_id, token
+ )
+
+ return super(SlavedGroupServerStore, self).process_replication_rows(
+ stream_name, token, rows
+ )
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index fbafe12cc2..4c60bf79f9 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -118,6 +118,12 @@ CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
"state_key", # str
"event_id", # str, optional
))
+GroupsStreamRow = namedtuple("GroupsStreamRow", (
+ "group_id", # str
+ "user_id", # str
+ "type", # str
+ "content", # dict
+))
class Stream(object):
@@ -464,6 +470,19 @@ class CurrentStateDeltaStream(Stream):
super(CurrentStateDeltaStream, self).__init__(hs)
+class GroupServerStream(Stream):
+ NAME = "groups"
+ ROW_TYPE = GroupsStreamRow
+
+ def __init__(self, hs):
+ store = hs.get_datastore()
+
+ self.current_token = store.get_group_stream_token
+ self.update_function = store.get_all_groups_changes
+
+ super(GroupServerStream, self).__init__(hs)
+
+
STREAMS_MAP = {
stream.NAME: stream
for stream in (
@@ -482,5 +501,6 @@ STREAMS_MAP = {
TagAccountDataStream,
AccountDataStream,
CurrentStateDeltaStream,
+ GroupServerStream,
)
}
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index 45f0a4c599..5006ac863f 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -853,6 +853,8 @@ class GroupServerStore(SQLBaseStore):
},
)
+ return next_id
+
with self._group_updates_id_gen.get_next() as next_id:
yield self.runInteraction(
"register_user_group_membership",
@@ -993,5 +995,26 @@ class GroupServerStore(SQLBaseStore):
"get_groups_changes_for_user", _get_groups_changes_for_user_txn,
)
+ def get_all_groups_changes(self, from_token, to_token, limit):
+ from_token = int(from_token)
+ has_changed = self._group_updates_stream_cache.has_any_entity_changed(
+ from_token,
+ )
+ if not has_changed:
+ return []
+
+ def _get_all_groups_changes_txn(txn):
+ sql = """
+ SELECT stream_id, group_id, user_id, type, content
+ FROM local_group_updates
+ WHERE ? < stream_id AND stream_id <= ?
+ LIMIT ?
+ """
+ txn.execute(sql, (from_token, to_token, limit,))
+ return txn.fetchall()
+ return self.runInteraction(
+ "get_all_groups_changes", _get_all_groups_changes_txn,
+ )
+
def get_group_stream_token(self):
return self._group_updates_id_gen.get_current_token()
|