ladish/ladish_control

530 lines
22 KiB
Python
Executable File

#!/usr/bin/env python
#
# LADI Session Handler (ladish)
#
# Copyright (C) 2008, 2009, 2010, 2011 Nedko Arnaudov <nedko@arnaudov.name>
#
#*************************************************************************
# This file contains code of the commandline control app
#*************************************************************************
#
# LADI Session Handler is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# LADI Session Handler is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with LADI Session Handler. If not, see <http://www.gnu.org/licenses/>
# or write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
service_name = 'org.ladish'
control_object_path = "/org/ladish/Control"
studio_object_path = "/org/ladish/Studio"
control_interface_name = 'org.ladish.Control'
studio_interface_name = 'org.ladish.Studio'
app_supervisor_interface_name = 'org.ladish.AppSupervisor'
room_interface_name = 'org.ladish.Room'
patchbay_interface_name = 'org.jackaudio.JackPatchbay'
import sys
import os
import time
from traceback import print_exc
import dbus
def bool_convert(str_value):
if str_value.lower() == "false":
return False
if str_value.lower() == "off":
return False
if str_value.lower() == "no":
return False
if str_value == "0":
return False
if str_value.lower() == "(null)":
return False
return bool(str_value)
def dbus_type_to_python_type(dbus_value):
if type(dbus_value) == dbus.Boolean:
return bool(dbus_value)
if type(dbus_value) == dbus.Int32 or type(dbus_value) == dbus.UInt32:
return int(dbus_value)
return dbus_value
def dbus_type_to_type_string(dbus_value):
if type(dbus_value) == dbus.Boolean:
return "bool"
if type(dbus_value) == dbus.Int32:
return "sint"
if type(dbus_value) == dbus.UInt32:
return "uint"
if type(dbus_value) == dbus.Byte:
return "char"
if type(dbus_value) == dbus.String:
return "str"
return None # throw exception here?
def dbus_typesig_to_type_string(type_char):
type_char = str(type_char)
if type_char == 'i':
return "sint"
if type_char == 'u':
return "uint"
if type_char == 'y':
return "char"
if type_char == 's':
return "str"
if type_char == 'b':
return "bool"
print('unknown dbus typesig')
return None # throw exception here?
def parse_new_app_args(params_array):
#print params_array
cmdline = params_array[0]
index = 1
name = ''
level = '0'
term = False
if index < len(params_array):
if params_array[index] == '-':
return index + 1, cmdline, name, level, term
name = params_array[index]
index += 1
if index < len(params_array):
if params_array[index] == '-':
return index + 1, cmdline, name, level, term
# direct conversion to dbus.Byte is wrong because ord() is used ("1" -> 0x31 instead of "1" -> 0x01)
level = params_array[index]
index += 1
if index < len(params_array):
if params_array[index] == '-':
return index + 1, cmdline, name, level, term
if params_array[index] == 'term':
term = True
index += 1
if index < len(params_array) and params_array[index] == '-':
index += 1
return index, cmdline, name, level, term
def add_app(obj, cmdline, name, level, term):
dbus.Interface(obj, app_supervisor_interface_name).RunCustom(term, cmdline, name, level)
def get_room_obj_by_name(bus, studio_iface, room_name):
for room in studio_iface.GetRoomList():
#print repr(room)
opath = room[0]
name = room[1]["name"]
if name == room_name:
return bus.get_object(service_name, opath)
def dump_graph(obj):
patchbay_iface = dbus.Interface(obj, patchbay_interface_name)
graph = patchbay_iface.GetGraph(0)
for client in graph[1]:
print '"%s"' % client[1]
for port in client[2]:
port_flags = port[2]
if (port_flags & 1) != 0:
port_flags = "input"
elif (port_flags & 2) != 0:
port_flags = "output"
else:
port_flags = "unknown"
port_type = port[3]
if port_type == 0:
port_type = "audio"
elif port_type == 1:
port_type = "midi"
else:
port_type = "unknown"
print ' "%s" [%s %s]' % (port[1], port_flags, port_type)
print
if len(graph[2]):
if len(graph[2]) == 1:
print "1 connection:"
else:
print "%u connections:" % len(graph[2])
for connection in graph[2]:
print '"%s":"%s" -> "%s":"%s"' % (connection[1], connection[3], connection[5], connection[7])
else:
print "0 connections."
def main():
if len(sys.argv) == 1:
argv0 = os.path.basename(sys.argv[0])
print("Usage: %s [command] [command] ..." % argv0)
print("Commands:")
print(" exit - exit ladish dbus service")
print(" slist - list studios")
print(" alist - list apps")
print(" sload <studioname> - load studio")
print(" sdel <studioname> - delete studio")
print(" snew [studioname] - new studio")
print(" sisloaded - is studio loaded?")
print(" sname - get studio name")
print(" ssave - save studio")
print(" sunload - unload studio")
print(" srename <studioname> - rename studio")
print(" sstart - start studio")
print(" sstop - stop studio")
print(" rtlist - list room templates")
print(" rtdel <roomtemplatename> - delete room template")
print(" rtnew <roomtemplatename> - create new room template")
print(" snewroom <rname> <rtname> - create new studio room")
print(" srlist - list studio rooms")
print(" sdelroom <rname> - delete studio room")
print(" pload <rname> <proj_dir> - load project into room")
print(" punload <rname> - unload project from room")
print(" psave <rname> - save project")
print(" psaveas <rname> <proj_dir> <proj_name> - save as project")
print(" snewapp <appargs> - add new app to studio (see below for more info)")
print(" rnewapp <rname> <appargs> - add new app to room (see below for more info)")
print(" sgdump - studio graph dump")
print(" rgdump <rname> - room graph dump")
print(" sconnect <client1> <port1> <client2> <port2> - connect ports in studio");
print(" sdisconnect <client1> <port1> <client2> <port2> - disconnect ports in studio");
print(" rconnect <rname> <client1> <port1> <client2> <port2> - connect ports in studio");
print(" rdisconnect <rname> <client1> <port1> <client2> <port2> - disconnect ports in room");
print("");
print("Add new app arguments:");
print(" <commandline> [<name> [<level>] [term]] [-]");
print("");
print(" <commandline> - the commandline to execut");
print(" <name> - app name");
print(" <level> - level, default is 0");
print(" term - if specified, app will be run in terminal");
print(" - - marks end of new app params, useful if there are other commands following");
print("");
print("Examples:");
print("");
print(" * Add to studio jack_mixer instance named \"mixer\", at level 1, without terminal");
print("");
print(" $ %s snewapp jack_mixer mixer 1" % argv0);
print("");
print(" * Add to room \"main\" fluidjack instance named \"fluid\", at level 0, with terminal");
print("");
print(" $ %s rnewapp main \"fluidjack FluidR3.SF2\" fluid 0 term" % argv0);
print("");
sys.exit(0)
bus = dbus.SessionBus()
control_obj = None
studio_obj = None
# check arguments
index = 1
while index < len(sys.argv):
arg = sys.argv[index]
index += 1
try:
if not control_obj:
control_obj = bus.get_object(service_name, control_object_path)
control_iface = dbus.Interface(control_obj, control_interface_name)
if arg == "exit":
print("--- exit")
control_iface.Exit()
time.sleep(1)
# we have deactivated the object and we need to get new connection if there are more commands
control_obj = None
control_iface = None
elif arg == 'slist':
print("--- studio list")
for studio in control_iface.GetStudioList():
name = studio[0]
mtime = studio[1]['Modification Time']
print('"%s" last modified on %s' % (name, time.ctime(mtime)))
elif arg == 'alist':
print("--- app list")
for app in control_iface.GetApplicationList():
print(app)
elif arg == 'sload':
print("--- studio load")
if index >= len(sys.argv):
print("load studio command requires studio name argument")
sys.exit()
arg = sys.argv[index]
index += 1
open_options = {}
#open_options["option1"] = "asd"
#open_options["option2"] = True
control_iface.LoadStudio(arg, open_options)
elif arg == 'sdel':
print("--- studio delete")
if index >= len(sys.argv):
print("delete studio command requires studio name argument")
sys.exit()
arg = sys.argv[index]
index += 1
control_iface.DeleteStudio(arg)
elif arg == 'snew':
print("--- studio new")
name = ""
if index < len(sys.argv):
name = sys.argv[index]
index += 1
control_iface.NewStudio(name)
elif arg == 'sisloaded':
print("--- studio is loaded")
if control_iface.IsStudioLoaded():
print("yes")
else:
print("no")
elif arg == 'rtlist':
print("--- list room templates")
for studio in control_iface.GetRoomTemplateList():
name = studio[0]
print('"%s"' % name)
elif arg == 'rtnew':
print("--- create new room template")
if index >= len(sys.argv):
print("create new room template command requires room template name argument")
sys.exit()
arg = sys.argv[index]
index += 1
control_iface.CreateRoomTemplate(arg)
elif arg == 'rtdel':
print("--- delete room template")
if index >= len(sys.argv):
print("delete room template command requires room template name argument")
sys.exit()
arg = sys.argv[index]
index += 1
control_iface.DeleteRoomTemplate(arg)
else:
if not studio_obj:
studio_obj = bus.get_object(service_name, studio_object_path)
studio_iface = dbus.Interface(studio_obj, studio_interface_name)
if arg == 'sname':
print("--- studio get name")
print("\"%s\"" % studio_iface.GetName())
elif arg == 'ssave':
print("--- studio save")
studio_iface.Save()
elif arg == 'sunload':
print("--- studio unload")
studio_iface.Unload()
studio_obj = None
studio_iface = None
elif arg == 'srename':
print("--- studio rename")
if index >= len(sys.argv):
print("rename studio command requires studio name argument")
sys.exit()
arg = sys.argv[index]
index += 1
studio_iface.Rename(arg)
elif arg == 'sstart':
print("--- studio start")
studio_iface.Start()
elif arg == 'sstop':
print("--- studio stop")
studio_iface.Stop()
elif arg == 'snewroom':
print("--- create new studio room")
if index + 1 >= len(sys.argv):
print("creation of studio room requires room name and room template name arguments")
sys.exit()
room_name = sys.argv[index]
index += 1
room_template_name = sys.argv[index]
index += 1
studio_iface.CreateRoom(room_name, room_template_name)
elif arg == 'srlist':
print("--- list studio rooms")
for room in studio_iface.GetRoomList():
#print repr(room)
opath = room[0]
name = room[1]["name"]
if room[1].has_key("template"):
template = str(room[1]["template"])
else:
template = None
if template:
print('"%s" from template "%s" (%s)' % (name, template, opath))
else:
print('"%s" (%s)' % (name, opath))
elif arg == 'sdelroom':
print("--- delete studio room")
if index >= len(sys.argv):
print("delete studio room command requires room name argument")
sys.exit()
arg = sys.argv[index]
index += 1
studio_iface.DeleteRoom(arg)
elif arg == 'pload':
print("--- load project")
if index + 1 >= len(sys.argv):
print("load project command requires room name and project dir arguments")
sys.exit()
room_name = sys.argv[index]
index += 1
project_dir = sys.argv[index]
index += 1
dbus.Interface(get_room_obj_by_name(bus, studio_iface, room_name), room_interface_name).LoadProject(project_dir)
elif arg == 'punload':
print("--- unload project")
if index >= len(sys.argv):
print("load project command requires room name argument")
sys.exit()
room_name = sys.argv[index]
index += 1
dbus.Interface(get_room_obj_by_name(bus, studio_iface, room_name), room_interface_name).UnloadProject()
elif arg == 'psave':
print("--- save project")
if index >= len(sys.argv):
print("save project command requires room name argument")
sys.exit()
room_name = sys.argv[index]
index += 1
dbus.Interface(get_room_obj_by_name(bus, studio_iface, room_name), room_interface_name).SaveProject("", "")
elif arg == 'psaveas':
print("--- save project as")
if index + 2 >= len(sys.argv):
print("save project as command requires room name, project dir and project name arguments")
sys.exit()
room_name = sys.argv[index]
index += 1
project_dir = sys.argv[index]
index += 1
project_name = sys.argv[index]
index += 1
dbus.Interface(get_room_obj_by_name(bus, studio_iface, room_name), room_interface_name).SaveProject(project_dir, project_name)
elif arg == 'snewapp':
print("--- new studio app")
count, cmdline, name, level, term = parse_new_app_args(sys.argv[index:])
index += count
add_app(studio_obj, cmdline, name, level, term)
elif arg == 'rnewapp':
print("--- new room app")
arg = sys.argv[index]
index += 1
count, cmdline, name, level, term = parse_new_app_args(sys.argv[index:])
index += count
add_app(get_room_obj_by_name(bus, studio_iface, arg), cmdline, name, level, term)
elif arg == "sgdump":
print('--- dump studio graph')
dump_graph(studio_obj)
elif arg == "rgdump":
if index >= len(sys.argv):
print("rconnect command requires rname, client1, port1, client2 and port2 arguments")
sys.exit()
rname = sys.argv[index]
index += 1
print('--- dump room "' + rname + '" graph')
room_obj = get_room_obj_by_name(bus, studio_obj, rname)
dump_graph(room_obj)
elif arg == "sconnect":
if index + 4 > len(sys.argv):
print("sconnect command requires client1, port1, client2 and port2 arguments")
sys.exit()
c1 = sys.argv[index]
p1 = sys.argv[index + 1]
c2 = sys.argv[index + 2]
p2 = sys.argv[index + 3]
index += 4
print('--- connect studio port "' + c1 + '":"' + p1 + '" to "' + c2 + '":"' + p2 + '"')
patchbay_iface = dbus.Interface(studio_obj, patchbay_interface_name)
patchbay_iface.ConnectPortsByName(c1, p1, c2, p2)
elif arg == "sdisconnect":
if index + 4 > len(sys.argv):
print("sdisconnect command requires client1, port1, client2 and port2 arguments")
sys.exit()
c1 = sys.argv[index]
p1 = sys.argv[index + 1]
c2 = sys.argv[index + 2]
p2 = sys.argv[index + 3]
index += 4
print('--- disconnect studio port "' + c2 + '":"' + p2 + '" from "' + c1 + '":"' + p1 + '"')
patchbay_iface = dbus.Interface(studio_obj, patchbay_interface_name)
patchbay_iface.DisconnectPortsByName(c1, p1, c2, p2)
elif arg == "rconnect":
if index + 5 > len(sys.argv):
print("rconnect command requires rname, client1, port1, client2 and port2 arguments")
sys.exit()
rname = sys.argv[index]
c1 = sys.argv[index + 1]
p1 = sys.argv[index + 2]
c2 = sys.argv[index + 3]
p2 = sys.argv[index + 4]
index += 5
print('--- connect room "' + rname + '" port "' + c1 + '":"' + p1 + '" to "' + c2 + '":"' + p2 + '"')
room_obj = get_room_obj_by_name(bus, studio_iface, rname)
patchbay_iface = dbus.Interface(room_obj, patchbay_interface_name)
patchbay_iface.ConnectPortsByName(c1, p1, c2, p2)
elif arg == "rdisconnect":
if index + 5 > len(sys.argv):
print("rdisconnect command requires rname, client1, port1, client2 and port2 arguments")
sys.exit()
rname = sys.argv[index]
c1 = sys.argv[index + 1]
p1 = sys.argv[index + 2]
c2 = sys.argv[index + 3]
p2 = sys.argv[index + 4]
index += 5
print('--- disconnect room "' + rname + '" port "' + c2 + '":"' + p2 + '" from "' + c1 + '":"' + p1 + '"')
room_obj = get_room_obj_by_name(bus, studio_iface, rname)
patchbay_iface = dbus.Interface(room_obj, patchbay_interface_name)
patchbay_iface.DisconnectPortsByName(c1, p1, c2, p2)
else:
print("Unknown command '%s'" % arg)
except dbus.DBusException, e:
print("DBus exception: %s" % str(e))
if __name__ == '__main__':
main()