diff options
author | Erik Johnston <erik@matrix.org> | 2015-03-09 13:29:41 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-03-09 13:29:41 +0000 |
commit | f31e65ca8b3a056b81c9ee1c8e5be298e36ed495 (patch) | |
tree | 414d6a488f090cea0aff41ef8ca7346f47567a62 /contrib | |
parent | Merge branch 'develop' of github.com:matrix-org/synapse into erikj-perf (diff) | |
parent | D'oh: underscore, not hyphen (diff) | |
download | synapse-f31e65ca8b3a056b81c9ee1c8e5be298e36ed495.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into erikj-perf
Diffstat (limited to 'contrib')
-rw-r--r-- | contrib/graph/graph2.py | 3 | ||||
-rw-r--r-- | contrib/jitsimeetbridge/jitsimeetbridge.py | 410 | ||||
-rwxr-xr-x | contrib/vertobot/bridge.pl | 489 |
3 files changed, 696 insertions, 206 deletions
diff --git a/contrib/graph/graph2.py b/contrib/graph/graph2.py index 6b551d42e5..d0d2cfe7c0 100644 --- a/contrib/graph/graph2.py +++ b/contrib/graph/graph2.py @@ -21,6 +21,7 @@ import datetime import argparse from synapse.events import FrozenEvent +from synapse.util.frozenutils import unfreeze def make_graph(db_name, room_id, file_prefix, limit): @@ -70,7 +71,7 @@ def make_graph(db_name, room_id, file_prefix, limit): float(event.origin_server_ts) / 1000 ).strftime('%Y-%m-%d %H:%M:%S,%f') - content = json.dumps(event.get_dict()["content"]) + content = json.dumps(unfreeze(event.get_dict()["content"])) label = ( "<" diff --git a/contrib/jitsimeetbridge/jitsimeetbridge.py b/contrib/jitsimeetbridge/jitsimeetbridge.py index dbc6f6ffa5..15f8e1c48b 100644 --- a/contrib/jitsimeetbridge/jitsimeetbridge.py +++ b/contrib/jitsimeetbridge/jitsimeetbridge.py @@ -39,43 +39,43 @@ ROOMDOMAIN="meet.jit.si" #ROOMDOMAIN="conference.jitsi.vuc.me" class TrivialMatrixClient: - def __init__(self, access_token): - self.token = None - self.access_token = access_token - - def getEvent(self): - while True: - url = MATRIXBASE+'events?access_token='+self.access_token+"&timeout=60000" - if self.token: - url += "&from="+self.token - req = grequests.get(url) - resps = grequests.map([req]) - obj = json.loads(resps[0].content) - print "incoming from matrix",obj - if 'end' not in obj: - continue - self.token = obj['end'] - if len(obj['chunk']): - return obj['chunk'][0] - - def joinRoom(self, roomId): - url = MATRIXBASE+'rooms/'+roomId+'/join?access_token='+self.access_token - print url - headers={ 'Content-Type': 'application/json' } - req = grequests.post(url, headers=headers, data='{}') - resps = grequests.map([req]) - obj = json.loads(resps[0].content) - print "response: ",obj - - def sendEvent(self, roomId, evType, event): - url = MATRIXBASE+'rooms/'+roomId+'/send/'+evType+'?access_token='+self.access_token - print url - print json.dumps(event) - headers={ 'Content-Type': 'application/json' } - req = grequests.post(url, headers=headers, data=json.dumps(event)) - resps = grequests.map([req]) - obj = json.loads(resps[0].content) - print "response: ",obj + def __init__(self, access_token): + self.token = None + self.access_token = access_token + + def getEvent(self): + while True: + url = MATRIXBASE+'events?access_token='+self.access_token+"&timeout=60000" + if self.token: + url += "&from="+self.token + req = grequests.get(url) + resps = grequests.map([req]) + obj = json.loads(resps[0].content) + print "incoming from matrix",obj + if 'end' not in obj: + continue + self.token = obj['end'] + if len(obj['chunk']): + return obj['chunk'][0] + + def joinRoom(self, roomId): + url = MATRIXBASE+'rooms/'+roomId+'/join?access_token='+self.access_token + print url + headers={ 'Content-Type': 'application/json' } + req = grequests.post(url, headers=headers, data='{}') + resps = grequests.map([req]) + obj = json.loads(resps[0].content) + print "response: ",obj + + def sendEvent(self, roomId, evType, event): + url = MATRIXBASE+'rooms/'+roomId+'/send/'+evType+'?access_token='+self.access_token + print url + print json.dumps(event) + headers={ 'Content-Type': 'application/json' } + req = grequests.post(url, headers=headers, data=json.dumps(event)) + resps = grequests.map([req]) + obj = json.loads(resps[0].content) + print "response: ",obj @@ -83,178 +83,178 @@ xmppClients = {} def matrixLoop(): - while True: - ev = matrixCli.getEvent() - print ev - if ev['type'] == 'm.room.member': - print 'membership event' - if ev['membership'] == 'invite' and ev['state_key'] == MYUSERNAME: - roomId = ev['room_id'] - print "joining room %s" % (roomId) - matrixCli.joinRoom(roomId) - elif ev['type'] == 'm.room.message': - if ev['room_id'] in xmppClients: - print "already have a bridge for that user, ignoring" - continue - print "got message, connecting" - xmppClients[ev['room_id']] = TrivialXmppClient(ev['room_id'], ev['user_id']) - gevent.spawn(xmppClients[ev['room_id']].xmppLoop) - elif ev['type'] == 'm.call.invite': - print "Incoming call" - #sdp = ev['content']['offer']['sdp'] - #print "sdp: %s" % (sdp) - #xmppClients[ev['room_id']] = TrivialXmppClient(ev['room_id'], ev['user_id']) - #gevent.spawn(xmppClients[ev['room_id']].xmppLoop) - elif ev['type'] == 'm.call.answer': - print "Call answered" - sdp = ev['content']['answer']['sdp'] - if ev['room_id'] not in xmppClients: - print "We didn't have a call for that room" - continue - # should probably check call ID too - xmppCli = xmppClients[ev['room_id']] - xmppCli.sendAnswer(sdp) - elif ev['type'] == 'm.call.hangup': - if ev['room_id'] in xmppClients: - xmppClients[ev['room_id']].stop() - del xmppClients[ev['room_id']] - + while True: + ev = matrixCli.getEvent() + print ev + if ev['type'] == 'm.room.member': + print 'membership event' + if ev['membership'] == 'invite' and ev['state_key'] == MYUSERNAME: + roomId = ev['room_id'] + print "joining room %s" % (roomId) + matrixCli.joinRoom(roomId) + elif ev['type'] == 'm.room.message': + if ev['room_id'] in xmppClients: + print "already have a bridge for that user, ignoring" + continue + print "got message, connecting" + xmppClients[ev['room_id']] = TrivialXmppClient(ev['room_id'], ev['user_id']) + gevent.spawn(xmppClients[ev['room_id']].xmppLoop) + elif ev['type'] == 'm.call.invite': + print "Incoming call" + #sdp = ev['content']['offer']['sdp'] + #print "sdp: %s" % (sdp) + #xmppClients[ev['room_id']] = TrivialXmppClient(ev['room_id'], ev['user_id']) + #gevent.spawn(xmppClients[ev['room_id']].xmppLoop) + elif ev['type'] == 'm.call.answer': + print "Call answered" + sdp = ev['content']['answer']['sdp'] + if ev['room_id'] not in xmppClients: + print "We didn't have a call for that room" + continue + # should probably check call ID too + xmppCli = xmppClients[ev['room_id']] + xmppCli.sendAnswer(sdp) + elif ev['type'] == 'm.call.hangup': + if ev['room_id'] in xmppClients: + xmppClients[ev['room_id']].stop() + del xmppClients[ev['room_id']] + class TrivialXmppClient: - def __init__(self, matrixRoom, userId): - self.rid = 0 - self.matrixRoom = matrixRoom - self.userId = userId - self.running = True - - def stop(self): - self.running = False - - def nextRid(self): - self.rid += 1 - return '%d' % (self.rid) - - def sendIq(self, xml): - fullXml = "<body rid='%s' xmlns='http://jabber.org/protocol/httpbind' sid='%s'>%s</body>" % (self.nextRid(), self.sid, xml) - #print "\t>>>%s" % (fullXml) - return self.xmppPoke(fullXml) - - def xmppPoke(self, xml): - headers = {'Content-Type': 'application/xml'} - req = grequests.post(HTTPBIND, verify=False, headers=headers, data=xml) - resps = grequests.map([req]) - obj = BeautifulSoup(resps[0].content) - return obj - - def sendAnswer(self, answer): - print "sdp from matrix client",answer - p = subprocess.Popen(['node', 'unjingle/unjingle.js', '--sdp'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) - jingle, out_err = p.communicate(answer) - jingle = jingle % { - 'tojid': self.callfrom, - 'action': 'session-accept', - 'initiator': self.callfrom, - 'responder': self.jid, - 'sid': self.callsid - } - print "answer jingle from sdp",jingle - res = self.sendIq(jingle) - print "reply from answer: ",res - - self.ssrcs = {} - jingleSoup = BeautifulSoup(jingle) - for cont in jingleSoup.iq.jingle.findAll('content'): - if cont.description: - self.ssrcs[cont['name']] = cont.description['ssrc'] - print "my ssrcs:",self.ssrcs - - gevent.joinall([ - gevent.spawn(self.advertiseSsrcs) - ]) - - def advertiseSsrcs(self): + def __init__(self, matrixRoom, userId): + self.rid = 0 + self.matrixRoom = matrixRoom + self.userId = userId + self.running = True + + def stop(self): + self.running = False + + def nextRid(self): + self.rid += 1 + return '%d' % (self.rid) + + def sendIq(self, xml): + fullXml = "<body rid='%s' xmlns='http://jabber.org/protocol/httpbind' sid='%s'>%s</body>" % (self.nextRid(), self.sid, xml) + #print "\t>>>%s" % (fullXml) + return self.xmppPoke(fullXml) + + def xmppPoke(self, xml): + headers = {'Content-Type': 'application/xml'} + req = grequests.post(HTTPBIND, verify=False, headers=headers, data=xml) + resps = grequests.map([req]) + obj = BeautifulSoup(resps[0].content) + return obj + + def sendAnswer(self, answer): + print "sdp from matrix client",answer + p = subprocess.Popen(['node', 'unjingle/unjingle.js', '--sdp'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) + jingle, out_err = p.communicate(answer) + jingle = jingle % { + 'tojid': self.callfrom, + 'action': 'session-accept', + 'initiator': self.callfrom, + 'responder': self.jid, + 'sid': self.callsid + } + print "answer jingle from sdp",jingle + res = self.sendIq(jingle) + print "reply from answer: ",res + + self.ssrcs = {} + jingleSoup = BeautifulSoup(jingle) + for cont in jingleSoup.iq.jingle.findAll('content'): + if cont.description: + self.ssrcs[cont['name']] = cont.description['ssrc'] + print "my ssrcs:",self.ssrcs + + gevent.joinall([ + gevent.spawn(self.advertiseSsrcs) + ]) + + def advertiseSsrcs(self): time.sleep(7) - print "SSRC spammer started" - while self.running: - ssrcMsg = "<presence to='%(tojid)s' xmlns='jabber:client'><x xmlns='http://jabber.org/protocol/muc'/><c xmlns='http://jabber.org/protocol/caps' hash='sha-1' node='http://jitsi.org/jitsimeet' ver='0WkSdhFnAUxrz4ImQQLdB80GFlE='/><nick xmlns='http://jabber.org/protocol/nick'>%(nick)s</nick><stats xmlns='http://jitsi.org/jitmeet/stats'><stat name='bitrate_download' value='175'/><stat name='bitrate_upload' value='176'/><stat name='packetLoss_total' value='0'/><stat name='packetLoss_download' value='0'/><stat name='packetLoss_upload' value='0'/></stats><media xmlns='http://estos.de/ns/mjs'><source type='audio' ssrc='%(assrc)s' direction='sendre'/><source type='video' ssrc='%(vssrc)s' direction='sendre'/></media></presence>" % { 'tojid': "%s@%s/%s" % (ROOMNAME, ROOMDOMAIN, self.shortJid), 'nick': self.userId, 'assrc': self.ssrcs['audio'], 'vssrc': self.ssrcs['video'] } - res = self.sendIq(ssrcMsg) - print "reply from ssrc announce: ",res - time.sleep(10) - - - - def xmppLoop(self): - self.matrixCallId = time.time() - res = self.xmppPoke("<body rid='%s' xmlns='http://jabber.org/protocol/httpbind' to='%s' xml:lang='en' wait='60' hold='1' content='text/xml; charset=utf-8' ver='1.6' xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh'/>" % (self.nextRid(), HOST)) - - print res - self.sid = res.body['sid'] - print "sid %s" % (self.sid) - - res = self.sendIq("<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='ANONYMOUS'/>") - - res = self.xmppPoke("<body rid='%s' xmlns='http://jabber.org/protocol/httpbind' sid='%s' to='%s' xml:lang='en' xmpp:restart='true' xmlns:xmpp='urn:xmpp:xbosh'/>" % (self.nextRid(), self.sid, HOST)) - - res = self.sendIq("<iq type='set' id='_bind_auth_2' xmlns='jabber:client'><bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/></iq>") - print res - - self.jid = res.body.iq.bind.jid.string - print "jid: %s" % (self.jid) - self.shortJid = self.jid.split('-')[0] - - res = self.sendIq("<iq type='set' id='_session_auth_2' xmlns='jabber:client'><session xmlns='urn:ietf:params:xml:ns:xmpp-session'/></iq>") - - #randomthing = res.body.iq['to'] - #whatsitpart = randomthing.split('-')[0] - - #print "other random bind thing: %s" % (randomthing) - - # advertise preence to the jitsi room, with our nick - res = self.sendIq("<iq type='get' to='%s' xmlns='jabber:client' id='1:sendIQ'><services xmlns='urn:xmpp:extdisco:1'><service host='%s'/></services></iq><presence to='%s@%s/d98f6c40' xmlns='jabber:client'><x xmlns='http://jabber.org/protocol/muc'/><c xmlns='http://jabber.org/protocol/caps' hash='sha-1' node='http://jitsi.org/jitsimeet' ver='0WkSdhFnAUxrz4ImQQLdB80GFlE='/><nick xmlns='http://jabber.org/protocol/nick'>%s</nick></presence>" % (HOST, TURNSERVER, ROOMNAME, ROOMDOMAIN, self.userId)) - self.muc = {'users': []} - for p in res.body.findAll('presence'): - u = {} - u['shortJid'] = p['from'].split('/')[1] - if p.c and p.c.nick: - u['nick'] = p.c.nick.string - self.muc['users'].append(u) - print "muc: ",self.muc - - # wait for stuff - while True: - print "waiting..." - res = self.sendIq("") - print "got from stream: ",res - if res.body.iq: - jingles = res.body.iq.findAll('jingle') - if len(jingles): - self.callfrom = res.body.iq['from'] - self.handleInvite(jingles[0]) - elif 'type' in res.body and res.body['type'] == 'terminate': - self.running = False - del xmppClients[self.matrixRoom] - return - - def handleInvite(self, jingle): - self.initiator = jingle['initiator'] - self.callsid = jingle['sid'] - p = subprocess.Popen(['node', 'unjingle/unjingle.js', '--jingle'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) - print "raw jingle invite",str(jingle) - sdp, out_err = p.communicate(str(jingle)) - print "transformed remote offer sdp",sdp - inviteEvent = { - 'offer': { - 'type': 'offer', - 'sdp': sdp - }, - 'call_id': self.matrixCallId, - 'version': 0, - 'lifetime': 30000 - } - matrixCli.sendEvent(self.matrixRoom, 'm.call.invite', inviteEvent) - + print "SSRC spammer started" + while self.running: + ssrcMsg = "<presence to='%(tojid)s' xmlns='jabber:client'><x xmlns='http://jabber.org/protocol/muc'/><c xmlns='http://jabber.org/protocol/caps' hash='sha-1' node='http://jitsi.org/jitsimeet' ver='0WkSdhFnAUxrz4ImQQLdB80GFlE='/><nick xmlns='http://jabber.org/protocol/nick'>%(nick)s</nick><stats xmlns='http://jitsi.org/jitmeet/stats'><stat name='bitrate_download' value='175'/><stat name='bitrate_upload' value='176'/><stat name='packetLoss_total' value='0'/><stat name='packetLoss_download' value='0'/><stat name='packetLoss_upload' value='0'/></stats><media xmlns='http://estos.de/ns/mjs'><source type='audio' ssrc='%(assrc)s' direction='sendre'/><source type='video' ssrc='%(vssrc)s' direction='sendre'/></media></presence>" % { 'tojid': "%s@%s/%s" % (ROOMNAME, ROOMDOMAIN, self.shortJid), 'nick': self.userId, 'assrc': self.ssrcs['audio'], 'vssrc': self.ssrcs['video'] } + res = self.sendIq(ssrcMsg) + print "reply from ssrc announce: ",res + time.sleep(10) + + + + def xmppLoop(self): + self.matrixCallId = time.time() + res = self.xmppPoke("<body rid='%s' xmlns='http://jabber.org/protocol/httpbind' to='%s' xml:lang='en' wait='60' hold='1' content='text/xml; charset=utf-8' ver='1.6' xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh'/>" % (self.nextRid(), HOST)) + + print res + self.sid = res.body['sid'] + print "sid %s" % (self.sid) + + res = self.sendIq("<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='ANONYMOUS'/>") + + res = self.xmppPoke("<body rid='%s' xmlns='http://jabber.org/protocol/httpbind' sid='%s' to='%s' xml:lang='en' xmpp:restart='true' xmlns:xmpp='urn:xmpp:xbosh'/>" % (self.nextRid(), self.sid, HOST)) + + res = self.sendIq("<iq type='set' id='_bind_auth_2' xmlns='jabber:client'><bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/></iq>") + print res + + self.jid = res.body.iq.bind.jid.string + print "jid: %s" % (self.jid) + self.shortJid = self.jid.split('-')[0] + + res = self.sendIq("<iq type='set' id='_session_auth_2' xmlns='jabber:client'><session xmlns='urn:ietf:params:xml:ns:xmpp-session'/></iq>") + + #randomthing = res.body.iq['to'] + #whatsitpart = randomthing.split('-')[0] + + #print "other random bind thing: %s" % (randomthing) + + # advertise preence to the jitsi room, with our nick + res = self.sendIq("<iq type='get' to='%s' xmlns='jabber:client' id='1:sendIQ'><services xmlns='urn:xmpp:extdisco:1'><service host='%s'/></services></iq><presence to='%s@%s/d98f6c40' xmlns='jabber:client'><x xmlns='http://jabber.org/protocol/muc'/><c xmlns='http://jabber.org/protocol/caps' hash='sha-1' node='http://jitsi.org/jitsimeet' ver='0WkSdhFnAUxrz4ImQQLdB80GFlE='/><nick xmlns='http://jabber.org/protocol/nick'>%s</nick></presence>" % (HOST, TURNSERVER, ROOMNAME, ROOMDOMAIN, self.userId)) + self.muc = {'users': []} + for p in res.body.findAll('presence'): + u = {} + u['shortJid'] = p['from'].split('/')[1] + if p.c and p.c.nick: + u['nick'] = p.c.nick.string + self.muc['users'].append(u) + print "muc: ",self.muc + + # wait for stuff + while True: + print "waiting..." + res = self.sendIq("") + print "got from stream: ",res + if res.body.iq: + jingles = res.body.iq.findAll('jingle') + if len(jingles): + self.callfrom = res.body.iq['from'] + self.handleInvite(jingles[0]) + elif 'type' in res.body and res.body['type'] == 'terminate': + self.running = False + del xmppClients[self.matrixRoom] + return + + def handleInvite(self, jingle): + self.initiator = jingle['initiator'] + self.callsid = jingle['sid'] + p = subprocess.Popen(['node', 'unjingle/unjingle.js', '--jingle'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) + print "raw jingle invite",str(jingle) + sdp, out_err = p.communicate(str(jingle)) + print "transformed remote offer sdp",sdp + inviteEvent = { + 'offer': { + 'type': 'offer', + 'sdp': sdp + }, + 'call_id': self.matrixCallId, + 'version': 0, + 'lifetime': 30000 + } + matrixCli.sendEvent(self.matrixRoom, 'm.call.invite', inviteEvent) + matrixCli = TrivialMatrixClient(ACCESS_TOKEN) gevent.joinall([ - gevent.spawn(matrixLoop) + gevent.spawn(matrixLoop) ]) diff --git a/contrib/vertobot/bridge.pl b/contrib/vertobot/bridge.pl new file mode 100755 index 0000000000..e1a07f6659 --- /dev/null +++ b/contrib/vertobot/bridge.pl @@ -0,0 +1,489 @@ +#!/usr/bin/env perl + +use strict; +use warnings; +use 5.010; # // +use IO::Socket::SSL qw(SSL_VERIFY_NONE); +use IO::Async::Loop; +use Net::Async::WebSocket::Client; +use Net::Async::HTTP; +use Net::Async::HTTP::Server; +use JSON; +use YAML; +use Data::UUID; +use Getopt::Long; +use Data::Dumper; +use URI::Encode qw(uri_encode uri_decode); + +binmode STDOUT, ":encoding(UTF-8)"; +binmode STDERR, ":encoding(UTF-8)"; + +my $msisdn_to_matrix = { + '447417892400' => '@matthew:matrix.org', +}; + +my $matrix_to_msisdn = {}; +foreach (keys %$msisdn_to_matrix) { + $matrix_to_msisdn->{$msisdn_to_matrix->{$_}} = $_; +} + + +my $loop = IO::Async::Loop->new; +# Net::Async::HTTP + SSL + IO::Poll doesn't play well. See +# https://rt.cpan.org/Ticket/Display.html?id=93107 +# ref $loop eq "IO::Async::Loop::Poll" and +# warn "Using SSL with IO::Poll causes known memory-leaks!!\n"; + +GetOptions( + 'C|config=s' => \my $CONFIG, + 'eval-from=s' => \my $EVAL_FROM, +) or exit 1; + +if( defined $EVAL_FROM ) { + # An emergency 'eval() this file' hack + $SIG{HUP} = sub { + my $code = do { + open my $fh, "<", $EVAL_FROM or warn( "Cannot read - $!" ), return; + local $/; <$fh> + }; + + eval $code or warn "Cannot eval() - $@"; + }; +} + +defined $CONFIG or die "Must supply --config\n"; + +my %CONFIG = %{ YAML::LoadFile( $CONFIG ) }; + +my %MATRIX_CONFIG = %{ $CONFIG{matrix} }; +# No harm in always applying this +$MATRIX_CONFIG{SSL_verify_mode} = SSL_VERIFY_NONE; + +my $bridgestate = {}; +my $roomid_by_callid = {}; + +my $sessid = lc new Data::UUID->create_str(); +my $as_token = $CONFIG{"matrix-bot"}->{as_token}; +my $hs_domain = $CONFIG{"matrix-bot"}->{domain}; + +my $http = Net::Async::HTTP->new(); +$loop->add( $http ); + +sub create_virtual_user +{ + my ($localpart) = @_; + my ( $response ) = $http->do_request( + method => "POST", + uri => URI->new( + $CONFIG{"matrix"}->{server}. + "/_matrix/client/api/v1/register?". + "access_token=$as_token&user_id=$localpart" + ), + content_type => "application/json", + content => <<EOT +{ + "type": "m.login.application_service", + "user": "$localpart" +} +EOT + )->get; + warn $response->as_string if ($response->code != 200); +} + +my $http_server = Net::Async::HTTP::Server->new( + on_request => sub { + my $self = shift; + my ( $req ) = @_; + + my $response; + my $path = uri_decode($req->path); + warn("request: $path"); + if ($path =~ m#/users/\@(\+.*)#) { + # when queried about virtual users, auto-create them in the HS + my $localpart = $1; + create_virtual_user($localpart); + $response = HTTP::Response->new( 200 ); + $response->add_content('{}'); + $response->content_type( "application/json" ); + } + elsif ($path =~ m#/transactions/(.*)#) { + my $event = JSON->new->decode($req->body); + print Dumper($event); + + my $room_id = $event->{room_id}; + my %dp = %{$CONFIG{'verto-dialog-params'}}; + $dp{callID} = $bridgestate->{$room_id}->{callid}; + + if ($event->{type} eq 'm.room.membership') { + my $membership = $event->{content}->{membership}; + my $state_key = $event->{state_key}; + my $room_id = $event->{state_id}; + + if ($membership eq 'invite') { + # autojoin invites + my ( $response ) = $http->do_request( + method => "POST", + uri => URI->new( + $CONFIG{"matrix"}->{server}. + "/_matrix/client/api/v1/rooms/$room_id/join?". + "access_token=$as_token&user_id=$state_key" + ), + content_type => "application/json", + content => "{}", + )->get; + warn $response->as_string if ($response->code != 200); + } + } + elsif ($event->{type} eq 'm.call.invite') { + my $room_id = $event->{room_id}; + $bridgestate->{$room_id}->{matrix_callid} = $event->{content}->{call_id}; + $bridgestate->{$room_id}->{callid} = lc new Data::UUID->create_str(); + $bridgestate->{$room_id}->{sessid} = $sessid; + # $bridgestate->{$room_id}->{offer} = $event->{content}->{offer}->{sdp}; + my $offer = $event->{content}->{offer}->{sdp}; + # $bridgestate->{$room_id}->{gathered_candidates} = 0; + $roomid_by_callid->{ $bridgestate->{$room_id}->{callid} } = $room_id; + # no trickle ICE in verto apparently + + my $f = send_verto_json_request("verto.invite", { + "sdp" => $offer, + "dialogParams" => \%dp, + "sessid" => $bridgestate->{$room_id}->{sessid}, + }); + $self->adopt_future($f); + } + # elsif ($event->{type} eq 'm.call.candidates') { + # # XXX: this could fire for both matrix->verto and verto->matrix calls + # # and races as it collects candidates. much better to just turn off + # # candidate gathering in the webclient entirely for now + # + # my $room_id = $event->{room_id}; + # # XXX: compare call IDs + # if (!$bridgestate->{$room_id}->{gathered_candidates}) { + # $bridgestate->{$room_id}->{gathered_candidates} = 1; + # my $offer = $bridgestate->{$room_id}->{offer}; + # my $candidate_block = ""; + # foreach (@{$event->{content}->{candidates}}) { + # $candidate_block .= "a=" . $_->{candidate} . "\r\n"; + # } + # # XXX: collate using the right m= line - for now assume audio call + # $offer =~ s/(a=rtcp.*[\r\n]+)/$1$candidate_block/; + # + # my $f = send_verto_json_request("verto.invite", { + # "sdp" => $offer, + # "dialogParams" => \%dp, + # "sessid" => $bridgestate->{$room_id}->{sessid}, + # }); + # $self->adopt_future($f); + # } + # else { + # # ignore them, as no trickle ICE, although we might as well + # # batch them up + # # foreach (@{$event->{content}->{candidates}}) { + # # push @{$bridgestate->{$room_id}->{candidates}}, $_; + # # } + # } + # } + elsif ($event->{type} eq 'm.call.answer') { + # grab the answer and relay it to verto as a verto.answer + my $room_id = $event->{room_id}; + + my $answer = $event->{content}->{answer}->{sdp}; + my $f = send_verto_json_request("verto.answer", { + "sdp" => $answer, + "dialogParams" => \%dp, + "sessid" => $bridgestate->{$room_id}->{sessid}, + }); + $self->adopt_future($f); + } + elsif ($event->{type} eq 'm.call.hangup') { + my $room_id = $event->{room_id}; + if ($bridgestate->{$room_id}->{matrix_callid} eq $event->{content}->{call_id}) { + my $f = send_verto_json_request("verto.bye", { + "dialogParams" => \%dp, + "sessid" => $bridgestate->{$room_id}->{sessid}, + }); + $self->adopt_future($f); + } + else { + warn "Ignoring unrecognised callid: ".$event->{content}->{call_id}; + } + } + else { + warn "Unhandled event: $event->{type}"; + } + + $response = HTTP::Response->new( 200 ); + $response->add_content('{}'); + $response->content_type( "application/json" ); + } + else { + warn "Unhandled path: $path"; + $response = HTTP::Response->new( 404 ); + } + + $req->respond( $response ); + }, +); +$loop->add( $http_server ); + +$http_server->listen( + addr => { family => "inet", socktype => "stream", port => 8009 }, + on_listen_error => sub { die "Cannot listen - $_[-1]\n" }, +); + +my $bot_verto = Net::Async::WebSocket::Client->new( + on_frame => sub { + my ( $self, $frame ) = @_; + warn "[Verto] receiving $frame"; + on_verto_json($frame); + }, +); +$loop->add( $bot_verto ); + +my $verto_connecting = $loop->new_future; +$bot_verto->connect( + %{ $CONFIG{"verto-bot"} }, + on_connected => sub { + warn("[Verto] connected to websocket"); + if (not $verto_connecting->is_done) { + $verto_connecting->done($bot_verto); + + send_verto_json_request("login", { + 'login' => $CONFIG{'verto-dialog-params'}{'login'}, + 'passwd' => $CONFIG{'verto-config'}{'passwd'}, + 'sessid' => $sessid, + }); + } + }, + on_connect_error => sub { die "Cannot connect to verto - $_[-1]" }, + on_resolve_error => sub { die "Cannot resolve to verto - $_[-1]" }, +); + +# die Dumper($verto_connecting); + +my $as_url = $CONFIG{"matrix-bot"}->{as_url}; + +Future->needs_all( + $http->do_request( + method => "POST", + uri => URI->new( $CONFIG{"matrix"}->{server}."/_matrix/appservice/v1/register" ), + content_type => "application/json", + content => <<EOT +{ + "as_token": "$as_token", + "url": "$as_url", + "namespaces": { "users": ["\@\\\\+.*"] } +} +EOT + ), + $verto_connecting, +)->get; + +$loop->attach_signal( + PIPE => sub { warn "pipe\n" } +); +$loop->attach_signal( + INT => sub { $loop->stop }, +); +$loop->attach_signal( + TERM => sub { $loop->stop }, +); + +eval { + $loop->run; +} or my $e = $@; + +die $e if $e; + +exit 0; + +{ + my $json_id; + my $requests; + + sub send_verto_json_request + { + $json_id ||= 1; + + my ($method, $params) = @_; + my $json = { + jsonrpc => "2.0", + method => $method, + params => $params, + id => $json_id, + }; + my $text = JSON->new->encode( $json ); + warn "[Verto] sending $text"; + $bot_verto->send_frame ( $text ); + my $request = $loop->new_future; + $requests->{$json_id} = $request; + $json_id++; + return $request; + } + + sub send_verto_json_response + { + my ($result, $id) = @_; + my $json = { + jsonrpc => "2.0", + result => $result, + id => $id, + }; + my $text = JSON->new->encode( $json ); + warn "[Verto] sending $text"; + $bot_verto->send_frame ( $text ); + } + + sub on_verto_json + { + my $json = JSON->new->decode( $_[0] ); + if ($json->{method}) { + if (($json->{method} eq 'verto.answer' && $json->{params}->{sdp}) || + $json->{method} eq 'verto.media') { + + my $caller = $json->{dialogParams}->{caller_id_number}; + my $callee = $json->{dialogParams}->{destination_number}; + my $caller_user = '@+' . $caller . ':' . $hs_domain; + my $callee_user = $msisdn_to_matrix->{$callee} || warn "unrecogised callee: $callee"; + my $room_id = $roomid_by_callid->{$json->{params}->{callID}}; + + if ($json->{params}->{sdp}) { + $http->do_request( + method => "POST", + uri => URI->new( + $CONFIG{"matrix"}->{server}. + "/_matrix/client/api/v1/send/m.call.answer?". + "access_token=$as_token&user_id=$caller_user" + ), + content_type => "application/json", + content => JSON->new->encode({ + call_id => $bridgestate->{$room_id}->{matrix_callid}, + version => 0, + answer => { + sdp => $json->{params}->{sdp}, + type => "answer", + }, + }), + )->then( sub { + send_verto_json_response( { + method => $json->{method}, + }, $json->{id}); + })->get; + } + } + elsif ($json->{method} eq 'verto.invite') { + my $caller = $json->{dialogParams}->{caller_id_number}; + my $callee = $json->{dialogParams}->{destination_number}; + my $caller_user = '@+' . $caller . ':' . $hs_domain; + my $callee_user = $msisdn_to_matrix->{$callee} || warn "unrecogised callee: $callee"; + + my $alias = ($caller lt $callee) ? ($caller.'-'.$callee) : ($callee.'-'.$caller); + my $room_id; + + # create a virtual user for the caller if needed. + create_virtual_user($caller); + + # create a room of form #peer-peer and invite the callee + $http->do_request( + method => "POST", + uri => URI->new( + $CONFIG{"matrix"}->{server}. + "/_matrix/client/api/v1/createRoom?". + "access_token=$as_token&user_id=$caller_user" + ), + content_type => "application/json", + content => JSON->new->encode({ + room_alias_name => $alias, + invite => [ $callee_user ], + }), + )->then( sub { + my ( $response ) = @_; + my $resp = JSON->new->decode($response->content); + $room_id = $resp->{room_id}; + $roomid_by_callid->{$json->{params}->{callID}} = $room_id; + })->get; + + # join it + my ($response) = $http->do_request( + method => "POST", + uri => URI->new( + $CONFIG{"matrix"}->{server}. + "/_matrix/client/api/v1/join/$room_id?". + "access_token=$as_token&user_id=$caller_user" + ), + content_type => "application/json", + content => '{}', + )->get; + + $bridgestate->{$room_id}->{matrix_callid} = lc new Data::UUID->create_str(); + $bridgestate->{$room_id}->{callid} = $json->{dialogParams}->{callID}; + $bridgestate->{$room_id}->{sessid} = $sessid; + + # put the m.call.invite in there + $http->do_request( + method => "POST", + uri => URI->new( + $CONFIG{"matrix"}->{server}. + "/_matrix/client/api/v1/send/m.call.invite?". + "access_token=$as_token&user_id=$caller_user" + ), + content_type => "application/json", + content => JSON->new->encode({ + call_id => $bridgestate->{$room_id}->{matrix_callid}, + version => 0, + answer => { + sdp => $json->{params}->{sdp}, + type => "offer", + }, + }), + )->then( sub { + # acknowledge the verto + send_verto_json_response( { + method => $json->{method}, + }, $json->{id}); + })->get; + } + elsif ($json->{method} eq 'verto.bye') { + my $caller = $json->{dialogParams}->{caller_id_number}; + my $callee = $json->{dialogParams}->{destination_number}; + my $caller_user = '@+' . $caller . ':' . $hs_domain; + my $callee_user = $msisdn_to_matrix->{$callee} || warn "unrecogised callee: $callee"; + my $room_id = $roomid_by_callid->{$json->{params}->{callID}}; + + # put the m.call.hangup into the room + $http->do_request( + method => "POST", + uri => URI->new( + $CONFIG{"matrix"}->{server}. + "/_matrix/client/api/v1/send/m.call.hangup?". + "access_token=$as_token&user_id=$caller_user" + ), + content_type => "application/json", + content => JSON->new->encode({ + call_id => $bridgestate->{$room_id}->{matrix_callid}, + version => 0, + }), + )->then( sub { + # acknowledge the verto + send_verto_json_response( { + method => $json->{method}, + }, $json->{id}); + })->get; + } + else { + warn ("[Verto] unhandled method: " . $json->{method}); + send_verto_json_response( { + method => $json->{method}, + }, $json->{id}); + } + } + elsif ($json->{result}) { + $requests->{$json->{id}}->done($json->{result}); + } + elsif ($json->{error}) { + $requests->{$json->{id}}->fail($json->{error}->{message}, $json->{error}); + } + } +} + |