diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 7e767b9bf5..186831e118 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -117,6 +117,9 @@ ROOM_EVENT_FILTER_SCHEMA = {
"lazy_load_members": {
"type": "boolean"
},
+ "include_redundant_members": {
+ "type": "boolean"
+ },
}
}
@@ -267,6 +270,9 @@ class FilterCollection(object):
def lazy_load_members(self):
return self._room_state_filter.lazy_load_members()
+ def include_redundant_members(self):
+ return self._room_state_filter.include_redundant_members()
+
def filter_presence(self, events):
return self._presence_filter.filter(events)
@@ -426,6 +432,9 @@ class Filter(object):
def lazy_load_members(self):
return self.filter_json.get("lazy_load_members", False)
+ def include_redundant_members(self):
+ return self.filter_json.get("include_redundant_members", False)
+
def _matches_wildcard(actual_value, filter_value):
if filter_value.endswith("*"):
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index b7e7718290..57b815d777 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -429,7 +429,7 @@ def run(hs):
stats_process = []
def start_phone_stats_home():
- run_as_background_process("phone_stats_home", phone_stats_home)
+ return run_as_background_process("phone_stats_home", phone_stats_home)
@defer.inlineCallbacks
def phone_stats_home():
@@ -502,7 +502,7 @@ def run(hs):
)
def generate_user_daily_visit_stats():
- run_as_background_process(
+ return run_as_background_process(
"generate_user_daily_visits",
hs.get_datastore().generate_user_daily_visits,
)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index c9beca27c2..8574898f0c 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -404,10 +404,10 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
class FederationSendLeaveServlet(BaseFederationServlet):
- PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<txid>[^/]*)"
+ PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
@defer.inlineCallbacks
- def on_PUT(self, origin, content, query, room_id, txid):
+ def on_PUT(self, origin, content, query, room_id, event_id):
content = yield self.handler.on_send_leave_request(origin, content)
defer.returnValue((200, content))
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index 4216af0a27..b04f4234ca 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -153,7 +153,7 @@ class GroupAttestionRenewer(object):
defer.returnValue({})
def _start_renew_attestations(self):
- run_as_background_process("renew_attestations", self._renew_attestations)
+ return run_as_background_process("renew_attestations", self._renew_attestations)
@defer.inlineCallbacks
def _renew_attestations(self):
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 43692b83a8..cb5c6d587e 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -256,7 +256,7 @@ class ProfileHandler(BaseHandler):
)
def _start_update_remote_profile_cache(self):
- run_as_background_process(
+ return run_as_background_process(
"Update remote profile", self._update_remote_profile_cache,
)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 003b848c00..7b7804d9b2 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -15,6 +15,7 @@
# limitations under the License.
"""Contains functions for performing events on rooms."""
+import itertools
import logging
import math
import string
@@ -401,7 +402,7 @@ class RoomContextHandler(object):
self.store = hs.get_datastore()
@defer.inlineCallbacks
- def get_event_context(self, user, room_id, event_id, limit):
+ def get_event_context(self, user, room_id, event_id, limit, event_filter):
"""Retrieves events, pagination tokens and state around a given event
in a room.
@@ -411,6 +412,8 @@ class RoomContextHandler(object):
event_id (str)
limit (int): The maximum number of events to return in total
(excluding state).
+ event_filter (Filter|None): the filter to apply to the events returned
+ (excluding the target event_id)
Returns:
dict, or None if the event isn't found
@@ -443,7 +446,7 @@ class RoomContextHandler(object):
)
results = yield self.store.get_events_around(
- room_id, event_id, before_limit, after_limit
+ room_id, event_id, before_limit, after_limit, event_filter
)
results["events_before"] = yield filter_evts(results["events_before"])
@@ -455,8 +458,23 @@ class RoomContextHandler(object):
else:
last_event_id = event_id
+ types = None
+ filtered_types = None
+ if event_filter and event_filter.lazy_load_members():
+ members = set(ev.sender for ev in itertools.chain(
+ results["events_before"],
+ (results["event"],),
+ results["events_after"],
+ ))
+ filtered_types = [EventTypes.Member]
+ types = [(EventTypes.Member, member) for member in members]
+
+ # XXX: why do we return the state as of the last event rather than the
+ # first? Shouldn't we be consistent with /sync?
+ # https://github.com/matrix-org/matrix-doc/issues/687
+
state = yield self.store.get_state_for_events(
- [last_event_id], None
+ [last_event_id], types, filtered_types=filtered_types,
)
results["state"] = list(state[last_event_id].values())
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index a832d91809..0d4a3f4677 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -708,6 +708,10 @@ class RoomMemberHandler(object):
inviter_display_name = member_event.content.get("displayname", "")
inviter_avatar_url = member_event.content.get("avatar_url", "")
+ # if user has no display name, default to their MXID
+ if not inviter_display_name:
+ inviter_display_name = user.to_string()
+
canonical_room_alias = ""
canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
if canonical_alias_event:
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 69ae9731d5..c464adbd0b 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -287,7 +287,7 @@ class SearchHandler(BaseHandler):
contexts = {}
for event in allowed_events:
res = yield self.store.get_events_around(
- event.room_id, event.event_id, before_limit, after_limit
+ event.room_id, event.event_id, before_limit, after_limit,
)
logger.info(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 4ced3144c8..dff1f67dcb 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -26,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership
from synapse.push.clientformat import format_push_rules_for_user
from synapse.types import RoomStreamToken
from synapse.util.async import concurrently_execute
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
@@ -33,6 +35,14 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
+# Store the cache that tracks which lazy-loaded members have been sent to a given
+# client for no more than 30 minutes.
+LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
+
+# Remember the last 100 members we sent to a client for the purposes of
+# avoiding redundantly sending the same lazy-loaded members to the client
+LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
+
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
@@ -182,6 +192,12 @@ class SyncHandler(object):
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
+ # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+ self.lazy_loaded_members_cache = ExpiringCache(
+ "lazy_loaded_members_cache", self.clock,
+ max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+ )
+
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
@@ -505,9 +521,13 @@ class SyncHandler(object):
with Measure(self.clock, "compute_state_delta"):
types = None
- lazy_load_members = sync_config.filter_collection.lazy_load_members()
filtered_types = None
+ lazy_load_members = sync_config.filter_collection.lazy_load_members()
+ include_redundant_members = (
+ sync_config.filter_collection.include_redundant_members()
+ )
+
if lazy_load_members:
# We only request state for the members needed to display the
# timeline:
@@ -523,6 +543,11 @@ class SyncHandler(object):
# only apply the filtering to room members
filtered_types = [EventTypes.Member]
+ timeline_state = {
+ (event.type, event.state_key): event.event_id
+ for event in batch.events if event.is_state()
+ }
+
if full_state:
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
@@ -543,11 +568,6 @@ class SyncHandler(object):
state_ids = current_state_ids
- timeline_state = {
- (event.type, event.state_key): event.event_id
- for event in batch.events if event.is_state()
- }
-
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_ids,
@@ -571,21 +591,6 @@ class SyncHandler(object):
filtered_types=filtered_types,
)
- timeline_state = {
- (event.type, event.state_key): event.event_id
- for event in batch.events if event.is_state()
- }
-
- # TODO: optionally filter out redundant membership events at this
- # point, to stop repeatedly sending members in every /sync as if
- # the client isn't tracking them.
- # When implemented, this should filter using event_ids (not mxids).
- # In practice, limited syncs are
- # relatively rare so it's not a total disaster to send redundant
- # members down at this point. Redundant members are ones which
- # repeatedly get sent down /sync because we don't know if the client
- # is caching them or not.
-
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
@@ -596,16 +601,48 @@ class SyncHandler(object):
else:
state_ids = {}
if lazy_load_members:
- # TODO: filter out redundant members based on their mxids (not their
- # event_ids) at this point. We know we can do it based on mxid as this
- # is an non-gappy incremental sync.
-
if types:
state_ids = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
filtered_types=filtered_types,
)
+ if lazy_load_members and not include_redundant_members:
+ cache_key = (sync_config.user.to_string(), sync_config.device_id)
+ cache = self.lazy_loaded_members_cache.get(cache_key)
+ if cache is None:
+ logger.debug("creating LruCache for %r", cache_key)
+ cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+ self.lazy_loaded_members_cache[cache_key] = cache
+ else:
+ logger.debug("found LruCache for %r", cache_key)
+
+ # if it's a new sync sequence, then assume the client has had
+ # amnesia and doesn't want any recent lazy-loaded members
+ # de-duplicated.
+ if since_token is None:
+ logger.debug("clearing LruCache for %r", cache_key)
+ cache.clear()
+ else:
+ # only send members which aren't in our LruCache (either
+ # because they're new to this client or have been pushed out
+ # of the cache)
+ logger.debug("filtering state from %r...", state_ids)
+ state_ids = {
+ t: event_id
+ for t, event_id in state_ids.iteritems()
+ if cache.get(t[1]) != event_id
+ }
+ logger.debug("...to %r", state_ids)
+
+ # add any member IDs we are about to send into our LruCache
+ for t, event_id in itertools.chain(
+ state_ids.items(),
+ timeline_state.items(),
+ ):
+ if t[0] == EventTypes.Member:
+ cache.set(t[1], event_id)
+
state = {}
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 9d820e44a6..ce678d5f75 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -151,13 +151,19 @@ def run_as_background_process(desc, func, *args, **kwargs):
This should be used to wrap processes which are fired off to run in the
background, instead of being associated with a particular request.
+ It returns a Deferred which completes when the function completes, but it doesn't
+ follow the synapse logcontext rules, which makes it appropriate for passing to
+ clock.looping_call and friends (or for firing-and-forgetting in the middle of a
+ normal synapse inlineCallbacks function).
+
Args:
desc (str): a description for this background process type
func: a function, which may return a Deferred
args: positional args for func
kwargs: keyword args for func
- Returns: None
+ Returns: Deferred which returns the result of func, but note that it does not
+ follow the synapse logcontext rules.
"""
@defer.inlineCallbacks
def run():
@@ -176,4 +182,4 @@ def run_as_background_process(desc, func, *args, **kwargs):
_background_processes[desc].remove(proc)
with PreserveLoggingContext():
- run()
+ return run()
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index b7bd878c90..13c331550b 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -531,11 +531,20 @@ class RoomEventContextServlet(ClientV1RestServlet):
limit = parse_integer(request, "limit", default=10)
+ # picking the API shape for symmetry with /messages
+ filter_bytes = parse_string(request, "filter")
+ if filter_bytes:
+ filter_json = urlparse.unquote(filter_bytes).decode("UTF-8")
+ event_filter = Filter(json.loads(filter_json))
+ else:
+ event_filter = None
+
results = yield self.room_context_handler.get_event_context(
requester.user,
room_id,
event_id,
limit,
+ event_filter,
)
if not results:
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 5b13378caa..174ad20123 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -106,7 +106,7 @@ class MediaRepository(object):
)
def _start_update_recently_accessed(self):
- run_as_background_process(
+ return run_as_background_process(
"update_recently_accessed_media", self._update_recently_accessed,
)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 4efd5339a4..27aa0def2f 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -373,7 +373,7 @@ class PreviewUrlResource(Resource):
})
def _start_expire_url_cache_data(self):
- run_as_background_process(
+ return run_as_background_process(
"expire_url_cache_data", self._expire_url_cache_data,
)
diff --git a/synapse/secrets.py b/synapse/secrets.py
index f397daaa5e..f05e9ea535 100644
--- a/synapse/secrets.py
+++ b/synapse/secrets.py
@@ -20,17 +20,16 @@ See https://docs.python.org/3/library/secrets.html#module-secrets for the API
used in Python 3.6, and the API emulated in Python 2.7.
"""
-import six
+import sys
-if six.PY3:
+# secrets is available since python 3.6
+if sys.version_info[0:2] >= (3, 6):
import secrets
def Secrets():
return secrets
-
else:
-
import os
import binascii
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 77ae10da3d..b8cefd43d6 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -102,7 +102,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
to_update,
)
- run_as_background_process(
+ return run_as_background_process(
"update_client_ips", update,
)
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 52dccb1507..c0943ecf91 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -712,7 +712,7 @@ class DeviceStore(SQLBaseStore):
logger.info("Pruned %d device list outbound pokes", txn.rowcount)
- run_as_background_process(
+ return run_as_background_process(
"prune_old_outbound_device_pokes",
self.runInteraction,
"_prune_old_outbound_device_pokes",
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 14500ee59c..8bd35df119 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -114,9 +114,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
sql = (
"SELECT b.event_id, MAX(e.depth) FROM events as e"
" INNER JOIN event_edges as g"
- " ON g.event_id = e.event_id AND g.room_id = e.room_id"
+ " ON g.event_id = e.event_id"
" INNER JOIN event_backward_extremities as b"
- " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id"
+ " ON g.prev_event_id = b.event_id"
" WHERE b.room_id = ? AND g.is_state is ?"
" GROUP BY b.event_id"
)
@@ -330,8 +330,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
"SELECT depth, prev_event_id FROM event_edges"
" INNER JOIN events"
" ON prev_event_id = events.event_id"
- " AND event_edges.room_id = events.room_id"
- " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+ " WHERE event_edges.event_id = ?"
" AND event_edges.is_state = ?"
" LIMIT ?"
)
@@ -365,7 +364,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
txn.execute(
query,
- (room_id, event_id, False, limit - len(event_results))
+ (event_id, False, limit - len(event_results))
)
for row in txn:
@@ -402,7 +401,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
query = (
"SELECT prev_event_id FROM event_edges "
- "WHERE room_id = ? AND event_id = ? AND is_state = ? "
+ "WHERE event_id = ? AND is_state = ? "
"LIMIT ?"
)
@@ -411,7 +410,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
for event_id in front:
txn.execute(
query,
- (room_id, event_id, False, limit - len(event_results))
+ (event_id, False, limit - len(event_results))
)
for e_id, in txn:
@@ -549,7 +548,7 @@ class EventFederationStore(EventFederationWorkerStore):
sql,
(self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
)
- run_as_background_process(
+ return run_as_background_process(
"delete_old_forward_extrem_cache",
self.runInteraction,
"_delete_old_forward_extrem_cache",
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 4f44b0ad47..6840320641 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -460,7 +460,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
)
def _find_stream_orderings_for_times(self):
- run_as_background_process(
+ return run_as_background_process(
"event_push_action_stream_orderings",
self.runInteraction,
"_find_stream_orderings_for_times",
@@ -790,7 +790,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
""", (room_id, user_id, stream_ordering))
def _start_rotate_notifs(self):
- run_as_background_process("rotate_notifs", self._rotate_notifs)
+ return run_as_background_process("rotate_notifs", self._rotate_notifs)
@defer.inlineCallbacks
def _rotate_notifs(self):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index a32a306495..c98e524ba1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -524,7 +524,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
iterable=list(new_latest_event_ids),
retcols=["prev_event_id"],
keyvalues={
- "room_id": room_id,
"is_state": False,
},
desc="_calculate_new_extremeties",
@@ -1193,7 +1192,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
"type": event.type,
"processed": True,
"outlier": event.internal_metadata.is_outlier(),
- "content": encode_json(event.content).decode("UTF-8"),
"origin_server_ts": int(event.origin_server_ts),
"received_ts": self._clock.time_msec(),
"sender": event.sender,
diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py
new file mode 100644
index 0000000000..7d27342e39
--- /dev/null
+++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+We want to stop populating 'event.content', so we need to make it nullable.
+
+If this has to be rolled back, then the following should populate the missing data:
+
+Postgres:
+
+ UPDATE events SET content=(ej.json::json)->'content' FROM event_json ej
+ WHERE ej.event_id = events.event_id AND
+ stream_ordering < (
+ SELECT stream_ordering FROM events WHERE content IS NOT NULL
+ ORDER BY stream_ordering LIMIT 1
+ );
+
+ UPDATE events SET content=(ej.json::json)->'content' FROM event_json ej
+ WHERE ej.event_id = events.event_id AND
+ stream_ordering > (
+ SELECT stream_ordering FROM events WHERE content IS NOT NULL
+ ORDER BY stream_ordering DESC LIMIT 1
+ );
+
+SQLite:
+
+ UPDATE events SET content=(
+ SELECT json_extract(json,'$.content') FROM event_json ej
+ WHERE ej.event_id = events.event_id
+ )
+ WHERE
+ stream_ordering < (
+ SELECT stream_ordering FROM events WHERE content IS NOT NULL
+ ORDER BY stream_ordering LIMIT 1
+ )
+ OR stream_ordering > (
+ SELECT stream_ordering FROM events WHERE content IS NOT NULL
+ ORDER BY stream_ordering DESC LIMIT 1
+ );
+
+"""
+
+import logging
+
+from synapse.storage.engines import PostgresEngine
+
+logger = logging.getLogger(__name__)
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+ pass
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ if isinstance(database_engine, PostgresEngine):
+ cur.execute("""
+ ALTER TABLE events ALTER COLUMN content DROP NOT NULL;
+ """)
+ return
+
+ # sqlite is an arse about this. ref: https://www.sqlite.org/lang_altertable.html
+
+ cur.execute("SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'")
+ (oldsql,) = cur.fetchone()
+
+ sql = oldsql.replace("content TEXT NOT NULL", "content TEXT")
+ if sql == oldsql:
+ raise Exception("Couldn't find null constraint to drop in %s" % oldsql)
+
+ logger.info("Replacing definition of 'events' with: %s", sql)
+
+ cur.execute("PRAGMA schema_version")
+ (oldver,) = cur.fetchone()
+ cur.execute("PRAGMA writable_schema=ON")
+ cur.execute(
+ "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'",
+ (sql, ),
+ )
+ cur.execute("PRAGMA schema_version=%i" % (oldver+1,))
+ cur.execute("PRAGMA writable_schema=OFF")
diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql
index 52eec88357..6b5a5a88fa 100644
--- a/synapse/storage/schema/full_schemas/16/event_edges.sql
+++ b/synapse/storage/schema/full_schemas/16/event_edges.sql
@@ -37,7 +37,8 @@ CREATE TABLE IF NOT EXISTS event_edges(
event_id TEXT NOT NULL,
prev_event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
- is_state BOOL NOT NULL,
+ is_state BOOL NOT NULL, -- true if this is a prev_state edge rather than a regular
+ -- event dag edge.
UNIQUE (event_id, prev_event_id, room_id, is_state)
);
diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql
index ba5346806e..5f5cb8d01d 100644
--- a/synapse/storage/schema/full_schemas/16/im.sql
+++ b/synapse/storage/schema/full_schemas/16/im.sql
@@ -19,7 +19,12 @@ CREATE TABLE IF NOT EXISTS events(
event_id TEXT NOT NULL,
type TEXT NOT NULL,
room_id TEXT NOT NULL,
- content TEXT NOT NULL,
+
+ -- 'content' used to be created NULLable, but as of delta 50 we drop that constraint.
+ -- the hack we use to drop the constraint doesn't work for an in-memory sqlite
+ -- database, which breaks the sytests. Hence, we no longer make it nullable.
+ content TEXT,
+
unrecognized_keys TEXT,
processed BOOL NOT NULL,
outlier BOOL NOT NULL,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 9d85dbb1d6..b9f2b74ac6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -527,7 +527,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
@defer.inlineCallbacks
- def get_events_around(self, room_id, event_id, before_limit, after_limit):
+ def get_events_around(
+ self, room_id, event_id, before_limit, after_limit, event_filter=None,
+ ):
"""Retrieve events and pagination tokens around a given event in a
room.
@@ -536,6 +538,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
event_id (str)
before_limit (int)
after_limit (int)
+ event_filter (Filter|None)
Returns:
dict
@@ -543,7 +546,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
results = yield self.runInteraction(
"get_events_around", self._get_events_around_txn,
- room_id, event_id, before_limit, after_limit
+ room_id, event_id, before_limit, after_limit, event_filter,
)
events_before = yield self._get_events(
@@ -563,7 +566,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"end": results["after"]["token"],
})
- def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit):
+ def _get_events_around_txn(
+ self, txn, room_id, event_id, before_limit, after_limit, event_filter,
+ ):
"""Retrieves event_ids and pagination tokens around a given event in a
room.
@@ -572,6 +577,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
event_id (str)
before_limit (int)
after_limit (int)
+ event_filter (Filter|None)
Returns:
dict
@@ -601,11 +607,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
rows, start_token = self._paginate_room_events_txn(
txn, room_id, before_token, direction='b', limit=before_limit,
+ event_filter=event_filter,
)
events_before = [r.event_id for r in rows]
rows, end_token = self._paginate_room_events_txn(
txn, room_id, after_token, direction='f', limit=after_limit,
+ event_filter=event_filter,
)
events_after = [r.event_id for r in rows]
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index b4b479d94c..428e7fa36e 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -273,7 +273,9 @@ class TransactionStore(SQLBaseStore):
return self.cursor_to_dict(txn)
def _start_cleanup_transactions(self):
- run_as_background_process("cleanup_transactions", self._cleanup_transactions)
+ return run_as_background_process(
+ "cleanup_transactions", self._cleanup_transactions,
+ )
def _cleanup_transactions(self):
now = self._clock.time_msec()
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 465adc54a8..ce85b2ae11 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -64,7 +64,7 @@ class ExpiringCache(object):
return
def f():
- run_as_background_process(
+ return run_as_background_process(
"prune_cache_%s" % self._cache_name,
self._prune_cache,
)
|