diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index eacd49d6a5..8cdfd50f90 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -266,9 +266,9 @@ class DataStore(RoomMemberStore, RoomStore,
def count_r30_users(self):
"""
Counts the number of 30 day retained users, defined as:-
- * Users who have created their accounts more than 30 days
+ * Users who have created their accounts more than 30 days ago
* Where last seen at most 30 days ago
- * Where account creation and last_seen are > 30 days
+ * Where account creation and last_seen are > 30 days apart
Returns counts globaly for a given user as well as breaking
by platform
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 2fbebd4907..2262776ab2 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -376,7 +376,7 @@ class SQLBaseStore(object):
Returns:
A list of dicts where the key is the column header.
"""
- col_headers = list(intern(column[0]) for column in cursor.description)
+ col_headers = list(intern(str(column[0])) for column in cursor.description)
results = list(
dict(zip(col_headers, row)) for row in cursor
)
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index 338b495611..8c868ece75 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -18,6 +18,7 @@ from .postgres import PostgresEngine
from .sqlite3 import Sqlite3Engine
import importlib
+import platform
SUPPORTED_MODULE = {
@@ -31,6 +32,10 @@ def create_engine(database_config):
engine_class = SUPPORTED_MODULE.get(name, None)
if engine_class:
+ # pypy requires psycopg2cffi rather than psycopg2
+ if (name == "psycopg2" and
+ platform.python_implementation() == "PyPy"):
+ name = "psycopg2cffi"
module = importlib.import_module(name)
return engine_class(module, database_config)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ece5e6c41f..da44b52fd6 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -444,6 +444,9 @@ class EventsStore(EventsWorkerStore):
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc_by(len(chunk))
+ synapse.metrics.event_persisted_position.set(
+ chunk[-1][0].internal_metadata.stream_ordering,
+ )
for event, context in chunk:
if context.app_service:
origin_type = "local"
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 2e23dd78ba..a937b9bceb 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -51,6 +51,26 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
class EventsWorkerStore(SQLBaseStore):
+ def get_received_ts(self, event_id):
+ """Get received_ts (when it was persisted) for the event.
+
+ Raises an exception for unknown events.
+
+ Args:
+ event_id (str)
+
+ Returns:
+ Deferred[int|None]: Timestamp in milliseconds, or None for events
+ that were persisted before received_ts was implemented.
+ """
+ return self._simple_select_one_onecol(
+ table="events",
+ keyvalues={
+ "event_id": event_id,
+ },
+ retcol="received_ts",
+ desc="get_received_ts",
+ )
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
|