diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 7e767b9bf5..186831e118 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -117,6 +117,9 @@ ROOM_EVENT_FILTER_SCHEMA = {
"lazy_load_members": {
"type": "boolean"
},
+ "include_redundant_members": {
+ "type": "boolean"
+ },
}
}
@@ -267,6 +270,9 @@ class FilterCollection(object):
def lazy_load_members(self):
return self._room_state_filter.lazy_load_members()
+ def include_redundant_members(self):
+ return self._room_state_filter.include_redundant_members()
+
def filter_presence(self, events):
return self._presence_filter.filter(events)
@@ -426,6 +432,9 @@ class Filter(object):
def lazy_load_members(self):
return self.filter_json.get("lazy_load_members", False)
+ def include_redundant_members(self):
+ return self.filter_json.get("include_redundant_members", False)
+
def _matches_wildcard(actual_value, filter_value):
if filter_value.endswith("*"):
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 4ced3144c8..dff1f67dcb 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -26,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership
from synapse.push.clientformat import format_push_rules_for_user
from synapse.types import RoomStreamToken
from synapse.util.async import concurrently_execute
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
@@ -33,6 +35,14 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
+# Store the cache that tracks which lazy-loaded members have been sent to a given
+# client for no more than 30 minutes.
+LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
+
+# Remember the last 100 members we sent to a client for the purposes of
+# avoiding redundantly sending the same lazy-loaded members to the client
+LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
+
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
@@ -182,6 +192,12 @@ class SyncHandler(object):
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
+ # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+ self.lazy_loaded_members_cache = ExpiringCache(
+ "lazy_loaded_members_cache", self.clock,
+ max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+ )
+
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
@@ -505,9 +521,13 @@ class SyncHandler(object):
with Measure(self.clock, "compute_state_delta"):
types = None
- lazy_load_members = sync_config.filter_collection.lazy_load_members()
filtered_types = None
+ lazy_load_members = sync_config.filter_collection.lazy_load_members()
+ include_redundant_members = (
+ sync_config.filter_collection.include_redundant_members()
+ )
+
if lazy_load_members:
# We only request state for the members needed to display the
# timeline:
@@ -523,6 +543,11 @@ class SyncHandler(object):
# only apply the filtering to room members
filtered_types = [EventTypes.Member]
+ timeline_state = {
+ (event.type, event.state_key): event.event_id
+ for event in batch.events if event.is_state()
+ }
+
if full_state:
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
@@ -543,11 +568,6 @@ class SyncHandler(object):
state_ids = current_state_ids
- timeline_state = {
- (event.type, event.state_key): event.event_id
- for event in batch.events if event.is_state()
- }
-
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_ids,
@@ -571,21 +591,6 @@ class SyncHandler(object):
filtered_types=filtered_types,
)
- timeline_state = {
- (event.type, event.state_key): event.event_id
- for event in batch.events if event.is_state()
- }
-
- # TODO: optionally filter out redundant membership events at this
- # point, to stop repeatedly sending members in every /sync as if
- # the client isn't tracking them.
- # When implemented, this should filter using event_ids (not mxids).
- # In practice, limited syncs are
- # relatively rare so it's not a total disaster to send redundant
- # members down at this point. Redundant members are ones which
- # repeatedly get sent down /sync because we don't know if the client
- # is caching them or not.
-
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
@@ -596,16 +601,48 @@ class SyncHandler(object):
else:
state_ids = {}
if lazy_load_members:
- # TODO: filter out redundant members based on their mxids (not their
- # event_ids) at this point. We know we can do it based on mxid as this
- # is an non-gappy incremental sync.
-
if types:
state_ids = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
filtered_types=filtered_types,
)
+ if lazy_load_members and not include_redundant_members:
+ cache_key = (sync_config.user.to_string(), sync_config.device_id)
+ cache = self.lazy_loaded_members_cache.get(cache_key)
+ if cache is None:
+ logger.debug("creating LruCache for %r", cache_key)
+ cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+ self.lazy_loaded_members_cache[cache_key] = cache
+ else:
+ logger.debug("found LruCache for %r", cache_key)
+
+ # if it's a new sync sequence, then assume the client has had
+ # amnesia and doesn't want any recent lazy-loaded members
+ # de-duplicated.
+ if since_token is None:
+ logger.debug("clearing LruCache for %r", cache_key)
+ cache.clear()
+ else:
+ # only send members which aren't in our LruCache (either
+ # because they're new to this client or have been pushed out
+ # of the cache)
+ logger.debug("filtering state from %r...", state_ids)
+ state_ids = {
+ t: event_id
+ for t, event_id in state_ids.iteritems()
+ if cache.get(t[1]) != event_id
+ }
+ logger.debug("...to %r", state_ids)
+
+ # add any member IDs we are about to send into our LruCache
+ for t, event_id in itertools.chain(
+ state_ids.items(),
+ timeline_state.items(),
+ ):
+ if t[0] == EventTypes.Member:
+ cache.set(t[1], event_id)
+
state = {}
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index f269ec6fb3..5d3ee90017 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -114,9 +114,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
sql = (
"SELECT b.event_id, MAX(e.depth) FROM events as e"
" INNER JOIN event_edges as g"
- " ON g.event_id = e.event_id AND g.room_id = e.room_id"
+ " ON g.event_id = e.event_id"
" INNER JOIN event_backward_extremities as b"
- " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id"
+ " ON g.prev_event_id = b.event_id"
" WHERE b.room_id = ? AND g.is_state is ?"
" GROUP BY b.event_id"
)
@@ -330,8 +330,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
"SELECT depth, prev_event_id FROM event_edges"
" INNER JOIN events"
" ON prev_event_id = events.event_id"
- " AND event_edges.room_id = events.room_id"
- " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+ " WHERE event_edges.event_id = ?"
" AND event_edges.is_state = ?"
" LIMIT ?"
)
@@ -365,7 +364,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
txn.execute(
query,
- (room_id, event_id, False, limit - len(event_results))
+ (event_id, False, limit - len(event_results))
)
for row in txn:
@@ -402,7 +401,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
query = (
"SELECT prev_event_id FROM event_edges "
- "WHERE room_id = ? AND event_id = ? AND is_state = ? "
+ "WHERE event_id = ? AND is_state = ? "
"LIMIT ?"
)
@@ -411,7 +410,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
for event_id in front:
txn.execute(
query,
- (room_id, event_id, False, limit - len(event_results))
+ (event_id, False, limit - len(event_results))
)
for e_id, in txn:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 200f5ec95f..2f482af3a1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -520,7 +520,6 @@ class EventsStore(EventsWorkerStore):
iterable=list(new_latest_event_ids),
retcols=["prev_event_id"],
keyvalues={
- "room_id": room_id,
"is_state": False,
},
desc="_calculate_new_extremeties",
@@ -1189,7 +1188,6 @@ class EventsStore(EventsWorkerStore):
"type": event.type,
"processed": True,
"outlier": event.internal_metadata.is_outlier(),
- "content": encode_json(event.content).decode("UTF-8"),
"origin_server_ts": int(event.origin_server_ts),
"received_ts": self._clock.time_msec(),
"sender": event.sender,
diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py
new file mode 100644
index 0000000000..7d27342e39
--- /dev/null
+++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+We want to stop populating 'event.content', so we need to make it nullable.
+
+If this has to be rolled back, then the following should populate the missing data:
+
+Postgres:
+
+ UPDATE events SET content=(ej.json::json)->'content' FROM event_json ej
+ WHERE ej.event_id = events.event_id AND
+ stream_ordering < (
+ SELECT stream_ordering FROM events WHERE content IS NOT NULL
+ ORDER BY stream_ordering LIMIT 1
+ );
+
+ UPDATE events SET content=(ej.json::json)->'content' FROM event_json ej
+ WHERE ej.event_id = events.event_id AND
+ stream_ordering > (
+ SELECT stream_ordering FROM events WHERE content IS NOT NULL
+ ORDER BY stream_ordering DESC LIMIT 1
+ );
+
+SQLite:
+
+ UPDATE events SET content=(
+ SELECT json_extract(json,'$.content') FROM event_json ej
+ WHERE ej.event_id = events.event_id
+ )
+ WHERE
+ stream_ordering < (
+ SELECT stream_ordering FROM events WHERE content IS NOT NULL
+ ORDER BY stream_ordering LIMIT 1
+ )
+ OR stream_ordering > (
+ SELECT stream_ordering FROM events WHERE content IS NOT NULL
+ ORDER BY stream_ordering DESC LIMIT 1
+ );
+
+"""
+
+import logging
+
+from synapse.storage.engines import PostgresEngine
+
+logger = logging.getLogger(__name__)
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+ pass
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ if isinstance(database_engine, PostgresEngine):
+ cur.execute("""
+ ALTER TABLE events ALTER COLUMN content DROP NOT NULL;
+ """)
+ return
+
+ # sqlite is an arse about this. ref: https://www.sqlite.org/lang_altertable.html
+
+ cur.execute("SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'")
+ (oldsql,) = cur.fetchone()
+
+ sql = oldsql.replace("content TEXT NOT NULL", "content TEXT")
+ if sql == oldsql:
+ raise Exception("Couldn't find null constraint to drop in %s" % oldsql)
+
+ logger.info("Replacing definition of 'events' with: %s", sql)
+
+ cur.execute("PRAGMA schema_version")
+ (oldver,) = cur.fetchone()
+ cur.execute("PRAGMA writable_schema=ON")
+ cur.execute(
+ "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'",
+ (sql, ),
+ )
+ cur.execute("PRAGMA schema_version=%i" % (oldver+1,))
+ cur.execute("PRAGMA writable_schema=OFF")
diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql
index ba5346806e..5f5cb8d01d 100644
--- a/synapse/storage/schema/full_schemas/16/im.sql
+++ b/synapse/storage/schema/full_schemas/16/im.sql
@@ -19,7 +19,12 @@ CREATE TABLE IF NOT EXISTS events(
event_id TEXT NOT NULL,
type TEXT NOT NULL,
room_id TEXT NOT NULL,
- content TEXT NOT NULL,
+
+ -- 'content' used to be created NULLable, but as of delta 50 we drop that constraint.
+ -- the hack we use to drop the constraint doesn't work for an in-memory sqlite
+ -- database, which breaks the sytests. Hence, we no longer make it nullable.
+ content TEXT,
+
unrecognized_keys TEXT,
processed BOOL NOT NULL,
outlier BOOL NOT NULL,
|