diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 9ba5778a88..0e3dd4e9ca 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -62,6 +62,13 @@ class EndToEndKeyBackgroundStore(SQLBaseStore):
class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
+ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
+ super().__init__(database, db_conn, hs)
+
+ self._allow_device_name_lookup_over_federation = (
+ self.hs.config.federation.allow_device_name_lookup_over_federation
+ )
+
async def get_e2e_device_keys_for_federation_query(
self, user_id: str
) -> Tuple[int, List[JsonDict]]:
@@ -85,7 +92,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
result["keys"] = keys
device_display_name = None
- if self.hs.config.allow_device_name_lookup_over_federation:
+ if self._allow_device_name_lookup_over_federation:
device_display_name = device.display_name
if device_display_name:
result["device_display_name"] = device_display_name
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ff81d5cd17..c0ea445550 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -16,6 +16,7 @@ import logging
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Set, Tuple
+from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
from synapse.events import EventBase
from synapse.metrics.background_process_metrics import wrap_as_background_process
@@ -670,8 +671,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
return dict(txn)
- async def get_max_depth_of(self, event_ids: List[str]) -> int:
- """Returns the max depth of a set of event IDs
+ async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
+ """Returns the event ID and depth for the event that has the max depth from a set of event IDs
Args:
event_ids: The event IDs to calculate the max depth of.
@@ -680,14 +681,53 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
table="events",
column="event_id",
iterable=event_ids,
- retcols=("depth",),
+ retcols=(
+ "event_id",
+ "depth",
+ ),
desc="get_max_depth_of",
)
if not rows:
- return 0
+ return None, 0
else:
- return max(row["depth"] for row in rows)
+ max_depth_event_id = ""
+ current_max_depth = 0
+ for row in rows:
+ if row["depth"] > current_max_depth:
+ max_depth_event_id = row["event_id"]
+ current_max_depth = row["depth"]
+
+ return max_depth_event_id, current_max_depth
+
+ async def get_min_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
+ """Returns the event ID and depth for the event that has the min depth from a set of event IDs
+
+ Args:
+ event_ids: The event IDs to calculate the max depth of.
+ """
+ rows = await self.db_pool.simple_select_many_batch(
+ table="events",
+ column="event_id",
+ iterable=event_ids,
+ retcols=(
+ "event_id",
+ "depth",
+ ),
+ desc="get_min_depth_of",
+ )
+
+ if not rows:
+ return None, 0
+ else:
+ min_depth_event_id = ""
+ current_min_depth = MAX_DEPTH
+ for row in rows:
+ if row["depth"] < current_min_depth:
+ min_depth_event_id = row["event_id"]
+ current_min_depth = row["depth"]
+
+ return min_depth_event_id, current_min_depth
async def get_prev_events_for_room(self, room_id: str) -> List[str]:
"""
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 5fc3bb5a7d..2796354a1f 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -90,7 +90,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
60 * 1000,
)
self.hs.get_clock().call_later(
- 1000,
+ 1,
self._count_known_servers,
)
LaterGauge(
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 33dc752d8f..051095fea9 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -16,9 +16,24 @@
import itertools
import logging
-from collections import deque, namedtuple
-from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
+from collections import deque
+from typing import (
+ Any,
+ Awaitable,
+ Callable,
+ Collection,
+ Deque,
+ Dict,
+ Generic,
+ Iterable,
+ List,
+ Optional,
+ Set,
+ Tuple,
+ TypeVar,
+)
+import attr
from prometheus_client import Counter, Histogram
from twisted.internet import defer
@@ -26,6 +41,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
+from synapse.logging import opentracing
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases import Databases
@@ -37,7 +53,7 @@ from synapse.types import (
StateMap,
get_domain_from_id,
)
-from synapse.util.async_helpers import ObservableDeferred
+from synapse.util.async_helpers import ObservableDeferred, yieldable_gather_results
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -89,25 +105,53 @@ times_pruned_extremities = Counter(
)
-class _EventPeristenceQueue:
+@attr.s(auto_attribs=True, slots=True)
+class _EventPersistQueueItem:
+ events_and_contexts: List[Tuple[EventBase, EventContext]]
+ backfilled: bool
+ deferred: ObservableDeferred
+
+ parent_opentracing_span_contexts: List = attr.ib(factory=list)
+ """A list of opentracing spans waiting for this batch"""
+
+ opentracing_span_context: Any = None
+ """The opentracing span under which the persistence actually happened"""
+
+
+_PersistResult = TypeVar("_PersistResult")
+
+
+class _EventPeristenceQueue(Generic[_PersistResult]):
"""Queues up events so that they can be persisted in bulk with only one
concurrent transaction per room.
"""
- _EventPersistQueueItem = namedtuple(
- "_EventPersistQueueItem", ("events_and_contexts", "backfilled", "deferred")
- )
+ def __init__(
+ self,
+ per_item_callback: Callable[
+ [List[Tuple[EventBase, EventContext]], bool],
+ Awaitable[_PersistResult],
+ ],
+ ):
+ """Create a new event persistence queue
- def __init__(self):
- self._event_persist_queues = {}
- self._currently_persisting_rooms = set()
+ The per_item_callback will be called for each item added via add_to_queue,
+ and its result will be returned via the Deferreds returned from add_to_queue.
+ """
+ self._event_persist_queues: Dict[str, Deque[_EventPersistQueueItem]] = {}
+ self._currently_persisting_rooms: Set[str] = set()
+ self._per_item_callback = per_item_callback
- def add_to_queue(self, room_id, events_and_contexts, backfilled):
+ async def add_to_queue(
+ self,
+ room_id: str,
+ events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
+ backfilled: bool,
+ ) -> _PersistResult:
"""Add events to the queue, with the given persist_event options.
- NB: due to the normal usage pattern of this method, it does *not*
- follow the synapse logcontext rules, and leaves the logcontext in
- place whether or not the returned deferred is ready.
+ If we are not already processing events in this room, starts off a background
+ process to to so, calling the per_item_callback for each item.
Args:
room_id (str):
@@ -115,38 +159,54 @@ class _EventPeristenceQueue:
backfilled (bool):
Returns:
- defer.Deferred: a deferred which will resolve once the events are
- persisted. Runs its callbacks *without* a logcontext. The result
- is the same as that returned by the callback passed to
- `handle_queue`.
+ the result returned by the `_per_item_callback` passed to
+ `__init__`.
"""
queue = self._event_persist_queues.setdefault(room_id, deque())
- if queue:
- # if the last item in the queue has the same `backfilled` setting,
- # we can just add these new events to that item.
- end_item = queue[-1]
- if end_item.backfilled == backfilled:
- end_item.events_and_contexts.extend(events_and_contexts)
- return end_item.deferred.observe()
- deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
+ # if the last item in the queue has the same `backfilled` setting,
+ # we can just add these new events to that item.
+ if queue and queue[-1].backfilled == backfilled:
+ end_item = queue[-1]
+ else:
+ # need to make a new queue item
+ deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
- queue.append(
- self._EventPersistQueueItem(
- events_and_contexts=events_and_contexts,
+ end_item = _EventPersistQueueItem(
+ events_and_contexts=[],
backfilled=backfilled,
deferred=deferred,
)
- )
+ queue.append(end_item)
+
+ # add our events to the queue item
+ end_item.events_and_contexts.extend(events_and_contexts)
+
+ # also add our active opentracing span to the item so that we get a link back
+ span = opentracing.active_span()
+ if span:
+ end_item.parent_opentracing_span_contexts.append(span.context)
+
+ # start a processor for the queue, if there isn't one already
+ self._handle_queue(room_id)
+
+ # wait for the queue item to complete
+ res = await make_deferred_yieldable(end_item.deferred.observe())
- return deferred.observe()
+ # add another opentracing span which links to the persist trace.
+ with opentracing.start_active_span_follows_from(
+ "persist_event_batch_complete", (end_item.opentracing_span_context,)
+ ):
+ pass
+
+ return res
- def handle_queue(self, room_id, per_item_callback):
+ def _handle_queue(self, room_id):
"""Attempts to handle the queue for a room if not already being handled.
- The given callback will be invoked with for each item in the queue,
+ The queue's callback will be invoked with for each item in the queue,
of type _EventPersistQueueItem. The per_item_callback will continuously
- be called with new items, unless the queue becomnes empty. The return
+ be called with new items, unless the queue becomes empty. The return
value of the function will be given to the deferreds waiting on the item,
exceptions will be passed to the deferreds as well.
@@ -156,7 +216,6 @@ class _EventPeristenceQueue:
If another callback is currently handling the queue then it will not be
invoked.
"""
-
if room_id in self._currently_persisting_rooms:
return
@@ -167,7 +226,17 @@ class _EventPeristenceQueue:
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
- ret = await per_item_callback(item)
+ with opentracing.start_active_span_follows_from(
+ "persist_event_batch",
+ item.parent_opentracing_span_contexts,
+ inherit_force_tracing=True,
+ ) as scope:
+ if scope:
+ item.opentracing_span_context = scope.span.context
+
+ ret = await self._per_item_callback(
+ item.events_and_contexts, item.backfilled
+ )
except Exception:
with PreserveLoggingContext():
item.deferred.errback()
@@ -214,9 +283,10 @@ class EventsPersistenceStorage:
self._clock = hs.get_clock()
self._instance_name = hs.get_instance_name()
self.is_mine_id = hs.is_mine_id
- self._event_persist_queue = _EventPeristenceQueue()
+ self._event_persist_queue = _EventPeristenceQueue(self._persist_event_batch)
self._state_resolution_handler = hs.get_state_resolution_handler()
+ @opentracing.trace
async def persist_events(
self,
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
@@ -241,26 +311,21 @@ class EventsPersistenceStorage:
for event, ctx in events_and_contexts:
partitioned.setdefault(event.room_id, []).append((event, ctx))
- deferreds = []
- for room_id, evs_ctxs in partitioned.items():
- d = self._event_persist_queue.add_to_queue(
+ async def enqueue(item):
+ room_id, evs_ctxs = item
+ return await self._event_persist_queue.add_to_queue(
room_id, evs_ctxs, backfilled=backfilled
)
- deferreds.append(d)
- for room_id in partitioned:
- self._maybe_start_persisting(room_id)
+ ret_vals = await yieldable_gather_results(enqueue, partitioned.items())
- # Each deferred returns a map from event ID to existing event ID if the
- # event was deduplicated. (The dict may also include other entries if
+ # Each call to add_to_queue returns a map from event ID to existing event ID if
+ # the event was deduplicated. (The dict may also include other entries if
# the event was persisted in a batch with other events).
#
- # Since we use `defer.gatherResults` we need to merge the returned list
+ # Since we use `yieldable_gather_results` we need to merge the returned list
# of dicts into one.
- ret_vals = await make_deferred_yieldable(
- defer.gatherResults(deferreds, consumeErrors=True)
- )
- replaced_events = {}
+ replaced_events: Dict[str, str] = {}
for d in ret_vals:
replaced_events.update(d)
@@ -277,6 +342,7 @@ class EventsPersistenceStorage:
self.main_store.get_room_max_token(),
)
+ @opentracing.trace
async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]:
@@ -287,16 +353,12 @@ class EventsPersistenceStorage:
event if it was deduplicated due to an existing event matching the
transaction ID.
"""
- deferred = self._event_persist_queue.add_to_queue(
- event.room_id, [(event, context)], backfilled=backfilled
- )
-
- self._maybe_start_persisting(event.room_id)
-
- # The deferred returns a map from event ID to existing event ID if the
+ # add_to_queue returns a map from event ID to existing event ID if the
# event was deduplicated. (The dict may also include other entries if
# the event was persisted in a batch with other events.)
- replaced_events = await make_deferred_yieldable(deferred)
+ replaced_events = await self._event_persist_queue.add_to_queue(
+ event.room_id, [(event, context)], backfilled=backfilled
+ )
replaced_event = replaced_events.get(event.event_id)
if replaced_event:
event = await self.main_store.get_event(replaced_event)
@@ -308,29 +370,14 @@ class EventsPersistenceStorage:
pos = PersistedEventPosition(self._instance_name, event_stream_id)
return event, pos, self.main_store.get_room_max_token()
- def _maybe_start_persisting(self, room_id: str):
- """Pokes the `_event_persist_queue` to start handling new items in the
- queue, if not already in progress.
-
- Causes the deferreds returned by `add_to_queue` to resolve with: a
- dictionary of event ID to event ID we didn't persist as we already had
- another event persisted with the same TXN ID.
- """
-
- async def persisting_queue(item):
- with Measure(self._clock, "persist_events"):
- return await self._persist_events(
- item.events_and_contexts, backfilled=item.backfilled
- )
-
- self._event_persist_queue.handle_queue(room_id, persisting_queue)
-
- async def _persist_events(
+ async def _persist_event_batch(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> Dict[str, str]:
- """Calculates the change to current state and forward extremities, and
+ """Callback for the _event_persist_queue
+
+ Calculates the change to current state and forward extremities, and
persists the given events and with those updates.
Returns:
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 3799d46734..683e5e3b90 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -1,5 +1,4 @@
-# Copyright 2014 - 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2014 - 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -26,7 +25,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.engines.postgres import PostgresEngine
-from synapse.storage.schema import SCHEMA_VERSION
+from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION
from synapse.storage.types import Cursor
logger = logging.getLogger(__name__)
@@ -59,6 +58,28 @@ UNAPPLIED_DELTA_ON_WORKER_ERROR = (
)
+@attr.s
+class _SchemaState:
+ current_version: int = attr.ib()
+ """The current schema version of the database"""
+
+ compat_version: Optional[int] = attr.ib()
+ """The SCHEMA_VERSION of the oldest version of Synapse for this database
+
+ If this is None, we have an old version of the database without the necessary
+ table.
+ """
+
+ applied_deltas: Collection[str] = attr.ib(factory=tuple)
+ """Any delta files for `current_version` which have already been applied"""
+
+ upgraded: bool = attr.ib(default=False)
+ """Whether the current state was reached by applying deltas.
+
+ If False, we have run the full schema for `current_version`, and have applied no
+ deltas since. If True, we have run some deltas since the original creation."""
+
+
def prepare_database(
db_conn: LoggingDatabaseConnection,
database_engine: BaseDatabaseEngine,
@@ -96,12 +117,11 @@ def prepare_database(
version_info = _get_or_create_schema_state(cur, database_engine)
if version_info:
- user_version, delta_files, upgraded = version_info
logger.info(
"%r: Existing schema is %i (+%i deltas)",
databases,
- user_version,
- len(delta_files),
+ version_info.current_version,
+ len(version_info.applied_deltas),
)
# config should only be None when we are preparing an in-memory SQLite db,
@@ -113,16 +133,18 @@ def prepare_database(
# if it's a worker app, refuse to upgrade the database, to avoid multiple
# workers doing it at once.
- if config.worker_app is not None and user_version != SCHEMA_VERSION:
+ if (
+ config.worker_app is not None
+ and version_info.current_version != SCHEMA_VERSION
+ ):
raise UpgradeDatabaseException(
- OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, user_version)
+ OUTDATED_SCHEMA_ON_WORKER_ERROR
+ % (SCHEMA_VERSION, version_info.current_version)
)
_upgrade_existing_database(
cur,
- user_version,
- delta_files,
- upgraded,
+ version_info,
database_engine,
config,
databases=databases,
@@ -261,9 +283,7 @@ def _setup_new_database(
_upgrade_existing_database(
cur,
- current_version=max_current_ver,
- applied_delta_files=[],
- upgraded=False,
+ _SchemaState(current_version=max_current_ver, compat_version=None),
database_engine=database_engine,
config=None,
databases=databases,
@@ -273,9 +293,7 @@ def _setup_new_database(
def _upgrade_existing_database(
cur: Cursor,
- current_version: int,
- applied_delta_files: List[str],
- upgraded: bool,
+ current_schema_state: _SchemaState,
database_engine: BaseDatabaseEngine,
config: Optional[HomeServerConfig],
databases: Collection[str],
@@ -321,12 +339,8 @@ def _upgrade_existing_database(
Args:
cur
- current_version: The current version of the schema.
- applied_delta_files: A list of deltas that have already been applied.
- upgraded: Whether the current version was generated by having
- applied deltas or from full schema file. If `True` the function
- will never apply delta files for the given `current_version`, since
- the current_version wasn't generated by applying those delta files.
+ current_schema_state: The current version of the schema, as
+ returned by _get_or_create_schema_state
database_engine
config:
None if we are initialising a blank database, otherwise the application
@@ -337,13 +351,16 @@ def _upgrade_existing_database(
upgrade portions of the delta scripts.
"""
if is_empty:
- assert not applied_delta_files
+ assert not current_schema_state.applied_deltas
else:
assert config
is_worker = config and config.worker_app is not None
- if current_version > SCHEMA_VERSION:
+ if (
+ current_schema_state.compat_version is not None
+ and current_schema_state.compat_version > SCHEMA_VERSION
+ ):
raise ValueError(
"Cannot use this database as it is too "
+ "new for the server to understand"
@@ -357,14 +374,26 @@ def _upgrade_existing_database(
assert config is not None
check_database_before_upgrade(cur, database_engine, config)
- start_ver = current_version
+ # update schema_compat_version before we run any upgrades, so that if synapse
+ # gets downgraded again, it won't try to run against the upgraded database.
+ if (
+ current_schema_state.compat_version is None
+ or current_schema_state.compat_version < SCHEMA_COMPAT_VERSION
+ ):
+ cur.execute("DELETE FROM schema_compat_version")
+ cur.execute(
+ "INSERT INTO schema_compat_version(compat_version) VALUES (?)",
+ (SCHEMA_COMPAT_VERSION,),
+ )
+
+ start_ver = current_schema_state.current_version
# if we got to this schema version by running a full_schema rather than a series
# of deltas, we should not run the deltas for this version.
- if not upgraded:
+ if not current_schema_state.upgraded:
start_ver += 1
- logger.debug("applied_delta_files: %s", applied_delta_files)
+ logger.debug("applied_delta_files: %s", current_schema_state.applied_deltas)
if isinstance(database_engine, PostgresEngine):
specific_engine_extension = ".postgres"
@@ -440,7 +469,7 @@ def _upgrade_existing_database(
absolute_path = entry.absolute_path
logger.debug("Found file: %s (%s)", relative_path, absolute_path)
- if relative_path in applied_delta_files:
+ if relative_path in current_schema_state.applied_deltas:
continue
root_name, ext = os.path.splitext(file_name)
@@ -621,7 +650,7 @@ def execute_statements_from_stream(cur: Cursor, f: TextIO) -> None:
def _get_or_create_schema_state(
txn: Cursor, database_engine: BaseDatabaseEngine
-) -> Optional[Tuple[int, List[str], bool]]:
+) -> Optional[_SchemaState]:
# Bluntly try creating the schema_version tables.
sql_path = os.path.join(schema_path, "common", "schema_version.sql")
executescript(txn, sql_path)
@@ -629,17 +658,31 @@ def _get_or_create_schema_state(
txn.execute("SELECT version, upgraded FROM schema_version")
row = txn.fetchone()
+ if row is None:
+ # new database
+ return None
+
+ current_version = int(row[0])
+ upgraded = bool(row[1])
+
+ compat_version: Optional[int] = None
+ txn.execute("SELECT compat_version FROM schema_compat_version")
+ row = txn.fetchone()
if row is not None:
- current_version = int(row[0])
- txn.execute(
- "SELECT file FROM applied_schema_deltas WHERE version >= ?",
- (current_version,),
- )
- applied_deltas = [d for d, in txn]
- upgraded = bool(row[1])
- return current_version, applied_deltas, upgraded
+ compat_version = int(row[0])
+
+ txn.execute(
+ "SELECT file FROM applied_schema_deltas WHERE version >= ?",
+ (current_version,),
+ )
+ applied_deltas = tuple(d for d, in txn)
- return None
+ return _SchemaState(
+ current_version=current_version,
+ compat_version=compat_version,
+ applied_deltas=applied_deltas,
+ upgraded=upgraded,
+ )
@attr.s(slots=True)
diff --git a/synapse/storage/schema/README.md b/synapse/storage/schema/README.md
index 030153db64..729f44ea6c 100644
--- a/synapse/storage/schema/README.md
+++ b/synapse/storage/schema/README.md
@@ -1,37 +1,4 @@
# Synapse Database Schemas
-This directory contains the schema files used to build Synapse databases.
-
-Synapse supports splitting its datastore across multiple physical databases (which can
-be useful for large installations), and the schema files are therefore split according
-to the logical database they are apply to.
-
-At the time of writing, the following "logical" databases are supported:
-
-* `state` - used to store Matrix room state (more specifically, `state_groups`,
- their relationships and contents.)
-* `main` - stores everything else.
-
-Addionally, the `common` directory contains schema files for tables which must be
-present on *all* physical databases.
-
-## Full schema dumps
-
-In the `full_schemas` directories, only the most recently-numbered snapshot is useful
-(`54` at the time of writing). Older snapshots (eg, `16`) are present for historical
-reference only.
-
-## Building full schema dumps
-
-If you want to recreate these schemas, they need to be made from a database that
-has had all background updates run.
-
-To do so, use `scripts-dev/make_full_schema.sh`. This will produce new
-`full.sql.postgres` and `full.sql.sqlite` files.
-
-Ensure postgres is installed, then run:
-
- ./scripts-dev/make_full_schema.sh -p postgres_username -o output_dir/
-
-NB at the time of writing, this script predates the split into separate `state`/`main`
-databases so will require updates to handle that correctly.
+This directory contains the schema files used to build Synapse databases. For more
+information, see /docs/development/database_schema.md.
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index f0d9f23167..d36ba1d773 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,6 +12,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-# Remember to update this number every time a change is made to database
-# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 59
+"""Represents the expectations made by the codebase about the database schema
+
+This should be incremented whenever the codebase changes its requirements on the
+shape of the database schema (even if those requirements are backwards-compatible with
+older versions of Synapse).
+
+See `README.md <synapse/storage/schema/README.md>`_ for more information on how this
+works.
+"""
+
+
+SCHEMA_COMPAT_VERSION = 59
+"""Limit on how far the synapse codebase can be rolled back without breaking db compat
+
+This value is stored in the database, and checked on startup. If the value in the
+database is greater than SCHEMA_VERSION, then Synapse will refuse to start.
+"""
diff --git a/synapse/storage/schema/common/schema_version.sql b/synapse/storage/schema/common/schema_version.sql
index 42e5cb6df5..f41fde5d2d 100644
--- a/synapse/storage/schema/common/schema_version.sql
+++ b/synapse/storage/schema/common/schema_version.sql
@@ -20,6 +20,13 @@ CREATE TABLE IF NOT EXISTS schema_version(
CHECK (Lock='X')
);
+CREATE TABLE IF NOT EXISTS schema_compat_version(
+ Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
+ -- The SCHEMA_VERSION of the oldest synapse this database can be used with
+ compat_version INTEGER NOT NULL,
+ CHECK (Lock='X')
+);
+
CREATE TABLE IF NOT EXISTS applied_schema_deltas(
version INTEGER NOT NULL,
file TEXT NOT NULL,
diff --git a/synapse/storage/schema/main/delta/59/11add_knock_members_to_stats.sql b/synapse/storage/schema/main/delta/59/11add_knock_members_to_stats.sql
new file mode 100644
index 0000000000..8eb2196f6a
--- /dev/null
+++ b/synapse/storage/schema/main/delta/59/11add_knock_members_to_stats.sql
@@ -0,0 +1,20 @@
+/* Copyright 2020 Sorunome
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Existing rows will default to NULL, so anything reading from these tables
+-- needs to interpret NULL as 0. This is fine here as no existing rooms can have
+-- any knocked members.
+ALTER TABLE room_stats_current ADD COLUMN knocked_members INT;
+ALTER TABLE room_stats_historical ADD COLUMN knocked_members BIGINT;
|