diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 5aec43b702..c4d3087fa4 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -180,9 +180,7 @@ class Config(object):
Returns:
str: the yaml config file
"""
- default_config = "# vim:ft=yaml\n"
-
- default_config += "\n\n".join(
+ default_config = "\n\n".join(
dedent(conf)
for conf in self.invoke_all(
"default_config",
@@ -297,19 +295,26 @@ class Config(object):
"Must specify a server_name to a generate config for."
" Pass -H server.name."
)
+
+ config_str = obj.generate_config(
+ config_dir_path=config_dir_path,
+ data_dir_path=os.getcwd(),
+ server_name=server_name,
+ report_stats=(config_args.report_stats == "yes"),
+ generate_secrets=True,
+ )
+
if not cls.path_exists(config_dir_path):
os.makedirs(config_dir_path)
with open(config_path, "w") as config_file:
- config_str = obj.generate_config(
- config_dir_path=config_dir_path,
- data_dir_path=os.getcwd(),
- server_name=server_name,
- report_stats=(config_args.report_stats == "yes"),
- generate_secrets=True,
+ config_file.write(
+ "# vim:ft=yaml\n\n"
)
- config = yaml.load(config_str)
- obj.invoke_all("generate_files", config)
config_file.write(config_str)
+
+ config = yaml.load(config_str)
+ obj.invoke_all("generate_files", config)
+
print(
(
"A config file has been generated in %r for server name"
diff --git a/synapse/config/database.py b/synapse/config/database.py
index c8890147a6..63e9cb63f8 100644
--- a/synapse/config/database.py
+++ b/synapse/config/database.py
@@ -49,7 +49,8 @@ class DatabaseConfig(Config):
def default_config(self, data_dir_path, **kwargs):
database_path = os.path.join(data_dir_path, "homeserver.db")
return """\
- # Database configuration
+ ## Database ##
+
database:
# The database engine name
name: "sqlite3"
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index f6940b65fd..464c28c2d9 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -81,7 +81,9 @@ class LoggingConfig(Config):
def default_config(self, config_dir_path, server_name, **kwargs):
log_config = os.path.join(config_dir_path, server_name + ".log.config")
- return """
+ return """\
+ ## Logging ##
+
# A yaml python logging config file
#
log_config: "%(log_config)s"
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 4200f10da3..35a322fee0 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -260,9 +260,11 @@ class ServerConfig(Config):
# This is used by remote servers to connect to this server,
# e.g. matrix.org, localhost:8080, etc.
# This is also the last part of your UserID.
+ #
server_name: "%(server_name)s"
# When running as a daemon, the file to store the pid in
+ #
pid_file: %(pid_file)s
# CPU affinity mask. Setting this restricts the CPUs on which the
@@ -304,9 +306,11 @@ class ServerConfig(Config):
# Set the soft limit on the number of file descriptors synapse can use
# Zero is used to indicate synapse should set the soft limit to the
# hard limit.
+ #
soft_file_limit: 0
# Set to false to disable presence tracking on this homeserver.
+ #
use_presence: true
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 8b2d03a756..1728089667 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,7 +16,6 @@ import logging
from twisted.internet import defer
-from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
from ._base import BaseHandler
@@ -39,31 +38,6 @@ class ReceiptsHandler(BaseHandler):
self.state = hs.get_state_handler()
@defer.inlineCallbacks
- def received_client_receipt(self, room_id, receipt_type, user_id,
- event_id):
- """Called when a client tells us a local user has read up to the given
- event_id in the room.
- """
- receipt = {
- "room_id": room_id,
- "receipt_type": receipt_type,
- "user_id": user_id,
- "event_ids": [event_id],
- "data": {
- "ts": int(self.clock.time_msec()),
- }
- }
-
- is_new = yield self._handle_new_receipts([receipt])
-
- if is_new:
- # fire off a process in the background to send the receipt to
- # remote servers
- run_as_background_process(
- 'push_receipts_to_remotes', self._push_remotes, receipt
- )
-
- @defer.inlineCallbacks
def _received_remote_receipt(self, origin, content):
"""Called when we receive an EDU of type m.receipt from a remote HS.
"""
@@ -128,43 +102,54 @@ class ReceiptsHandler(BaseHandler):
defer.returnValue(True)
@defer.inlineCallbacks
- def _push_remotes(self, receipt):
- """Given a receipt, works out which remote servers should be
- poked and pokes them.
+ def received_client_receipt(self, room_id, receipt_type, user_id,
+ event_id):
+ """Called when a client tells us a local user has read up to the given
+ event_id in the room.
"""
- try:
- # TODO: optimise this to move some of the work to the workers.
- room_id = receipt["room_id"]
- receipt_type = receipt["receipt_type"]
- user_id = receipt["user_id"]
- event_ids = receipt["event_ids"]
- data = receipt["data"]
+ receipt = {
+ "room_id": room_id,
+ "receipt_type": receipt_type,
+ "user_id": user_id,
+ "event_ids": [event_id],
+ "data": {
+ "ts": int(self.clock.time_msec()),
+ }
+ }
- users = yield self.state.get_current_user_in_room(room_id)
- remotedomains = set(get_domain_from_id(u) for u in users)
- remotedomains = remotedomains.copy()
- remotedomains.discard(self.server_name)
-
- logger.debug("Sending receipt to: %r", remotedomains)
-
- for domain in remotedomains:
- self.federation.build_and_send_edu(
- destination=domain,
- edu_type="m.receipt",
- content={
- room_id: {
- receipt_type: {
- user_id: {
- "event_ids": event_ids,
- "data": data,
- }
+ is_new = yield self._handle_new_receipts([receipt])
+ if not is_new:
+ return
+
+ # Work out which remote servers should be poked and poke them.
+
+ # TODO: optimise this to move some of the work to the workers.
+ data = receipt["data"]
+
+ # XXX why does this not use state.get_current_hosts_in_room() ?
+ users = yield self.state.get_current_user_in_room(room_id)
+ remotedomains = set(get_domain_from_id(u) for u in users)
+ remotedomains = remotedomains.copy()
+ remotedomains.discard(self.server_name)
+
+ logger.debug("Sending receipt to: %r", remotedomains)
+
+ for domain in remotedomains:
+ self.federation.build_and_send_edu(
+ destination=domain,
+ edu_type="m.receipt",
+ content={
+ room_id: {
+ receipt_type: {
+ user_id: {
+ "event_ids": [event_id],
+ "data": data,
}
- },
+ }
},
- key=(room_id, receipt_type, user_id),
- )
- except Exception:
- logger.exception("Error pushing receipts to remote servers")
+ },
+ key=(room_id, receipt_type, user_id),
+ )
@defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key):
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 0ac665e967..0fd1ccc40a 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -346,15 +346,23 @@ class ReceiptsStore(ReceiptsWorkerStore):
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
user_id, event_id, data, stream_id):
+ """Inserts a read-receipt into the database if it's newer than the current RR
+
+ Returns: int|None
+ None if the RR is older than the current RR
+ otherwise, the rx timestamp of the event that the RR corresponds to
+ (or 0 if the event is unknown)
+ """
res = self._simple_select_one_txn(
txn,
table="events",
- retcols=["topological_ordering", "stream_ordering"],
+ retcols=["stream_ordering", "received_ts"],
keyvalues={"event_id": event_id},
allow_none=True
)
stream_ordering = int(res["stream_ordering"]) if res else None
+ rx_ts = res["received_ts"] if res else 0
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
@@ -373,7 +381,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
"one for later event %s",
event_id, eid,
)
- return False
+ return None
txn.call_after(
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
@@ -429,7 +437,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
stream_ordering=stream_ordering,
)
- return True
+ return rx_ts
@defer.inlineCallbacks
def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data):
@@ -466,7 +474,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
stream_id_manager = self._receipts_id_gen.get_next()
with stream_id_manager as stream_id:
- have_persisted = yield self.runInteraction(
+ event_ts = yield self.runInteraction(
"insert_linearized_receipt",
self.insert_linearized_receipt_txn,
room_id, receipt_type, user_id, linearized_event_id,
@@ -474,8 +482,14 @@ class ReceiptsStore(ReceiptsWorkerStore):
stream_id=stream_id,
)
- if not have_persisted:
- defer.returnValue(None)
+ if event_ts is None:
+ defer.returnValue(None)
+
+ now = self._clock.time_msec()
+ logger.debug(
+ "RR for event %s in %s (%i ms old)",
+ linearized_event_id, room_id, now - event_ts,
+ )
yield self.insert_graph_receipt(
room_id, receipt_type, user_id, event_ids, data
|