#!/usr/bin/env python # # LADI Session Handler (ladish) # # Copyright (C) 2008, 2009, 2010, 2011 Nedko Arnaudov # #************************************************************************* # This file contains code of the commandline control app #************************************************************************* # # LADI Session Handler is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # LADI Session Handler is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with LADI Session Handler. If not, see # or write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. service_name = 'org.ladish' control_object_path = "/org/ladish/Control" studio_object_path = "/org/ladish/Studio" control_interface_name = 'org.ladish.Control' studio_interface_name = 'org.ladish.Studio' app_supervisor_interface_name = 'org.ladish.AppSupervisor' room_interface_name = 'org.ladish.Room' patchbay_interface_name = 'org.jackaudio.JackPatchbay' import sys import os import time from traceback import print_exc import dbus def bool_convert(str_value): if str_value.lower() == "false": return False if str_value.lower() == "off": return False if str_value.lower() == "no": return False if str_value == "0": return False if str_value.lower() == "(null)": return False return bool(str_value) def dbus_type_to_python_type(dbus_value): if type(dbus_value) == dbus.Boolean: return bool(dbus_value) if type(dbus_value) == dbus.Int32 or type(dbus_value) == dbus.UInt32: return int(dbus_value) return dbus_value def dbus_type_to_type_string(dbus_value): if type(dbus_value) == dbus.Boolean: return "bool" if type(dbus_value) == dbus.Int32: return "sint" if type(dbus_value) == dbus.UInt32: return "uint" if type(dbus_value) == dbus.Byte: return "char" if type(dbus_value) == dbus.String: return "str" return None # throw exception here? def dbus_typesig_to_type_string(type_char): type_char = str(type_char) if type_char == 'i': return "sint" if type_char == 'u': return "uint" if type_char == 'y': return "char" if type_char == 's': return "str" if type_char == 'b': return "bool" print('unknown dbus typesig') return None # throw exception here? def parse_new_app_args(params_array): #print params_array cmdline = params_array[0] index = 1 name = '' level = '0' term = False if index < len(params_array): if params_array[index] == '-': return index + 1, cmdline, name, level, term name = params_array[index] index += 1 if index < len(params_array): if params_array[index] == '-': return index + 1, cmdline, name, level, term # direct conversion to dbus.Byte is wrong because ord() is used ("1" -> 0x31 instead of "1" -> 0x01) level = params_array[index] index += 1 if index < len(params_array): if params_array[index] == '-': return index + 1, cmdline, name, level, term if params_array[index] == 'term': term = True index += 1 if index < len(params_array) and params_array[index] == '-': index += 1 return index, cmdline, name, level, term def add_app(obj, cmdline, name, level, term): dbus.Interface(obj, app_supervisor_interface_name).RunCustom(term, cmdline, name, level) def get_room_obj_by_name(bus, studio_iface, room_name): for room in studio_iface.GetRoomList(): #print repr(room) opath = room[0] name = room[1]["name"] if name == room_name: return bus.get_object(service_name, opath) def dump_graph(obj): patchbay_iface = dbus.Interface(obj, patchbay_interface_name) graph = patchbay_iface.GetGraph(0) for client in graph[1]: print '"%s"' % client[1] for port in client[2]: port_flags = port[2] if (port_flags & 1) != 0: port_flags = "input" elif (port_flags & 2) != 0: port_flags = "output" else: port_flags = "unknown" port_type = port[3] if port_type == 0: port_type = "audio" elif port_type == 1: port_type = "midi" else: port_type = "unknown" print ' "%s" [%s %s]' % (port[1], port_flags, port_type) print if len(graph[2]): if len(graph[2]) == 1: print "1 connection:" else: print "%u connections:" % len(graph[2]) for connection in graph[2]: print '"%s":"%s" -> "%s":"%s"' % (connection[1], connection[3], connection[5], connection[7]) else: print "0 connections." def main(): if len(sys.argv) == 1: argv0 = os.path.basename(sys.argv[0]) print("Usage: %s [command] [command] ..." % argv0) print("Commands:") print(" exit - exit ladish dbus service") print(" slist - list studios") print(" alist - list apps") print(" sload - load studio") print(" sdel - delete studio") print(" snew [studioname] - new studio") print(" sisloaded - is studio loaded?") print(" sname - get studio name") print(" ssave - save studio") print(" sunload - unload studio") print(" srename - rename studio") print(" sstart - start studio") print(" sstop - stop studio") print(" rtlist - list room templates") print(" rtdel - delete room template") print(" rtnew - create new room template") print(" snewroom - create new studio room") print(" srlist - list studio rooms") print(" sdelroom - delete studio room") print(" pload - load project into room") print(" punload - unload project from room") print(" psave - save project") print(" psaveas - save as project") print(" snewapp - add new app to studio (see below for more info)") print(" rnewapp - add new app to room (see below for more info)") print(" sgdump - studio graph dump") print(" rgdump - room graph dump") print(" sconnect - connect ports in studio"); print(" sdisconnect - disconnect ports in studio"); print(" rconnect - connect ports in studio"); print(" rdisconnect - disconnect ports in room"); print(""); print("Add new app arguments:"); print(" [ [] [term]] [-]"); print(""); print(" - the commandline to execut"); print(" - app name"); print(" - level, default is 0"); print(" term - if specified, app will be run in terminal"); print(" - - marks end of new app params, useful if there are other commands following"); print(""); print("Examples:"); print(""); print(" * Add to studio jack_mixer instance named \"mixer\", at level 1, without terminal"); print(""); print(" $ %s snewapp jack_mixer mixer 1" % argv0); print(""); print(" * Add to room \"main\" fluidjack instance named \"fluid\", at level 0, with terminal"); print(""); print(" $ %s rnewapp main \"fluidjack FluidR3.SF2\" fluid 0 term" % argv0); print(""); sys.exit(0) bus = dbus.SessionBus() control_obj = None studio_obj = None # check arguments index = 1 while index < len(sys.argv): arg = sys.argv[index] index += 1 try: if not control_obj: control_obj = bus.get_object(service_name, control_object_path) control_iface = dbus.Interface(control_obj, control_interface_name) if arg == "exit": print("--- exit") control_iface.Exit() time.sleep(1) # we have deactivated the object and we need to get new connection if there are more commands control_obj = None control_iface = None elif arg == 'slist': print("--- studio list") for studio in control_iface.GetStudioList(): name = studio[0] mtime = studio[1]['Modification Time'] print('"%s" last modified on %s' % (name, time.ctime(mtime))) elif arg == 'alist': print("--- app list") for app in control_iface.GetApplicationList(): print(app) elif arg == 'sload': print("--- studio load") if index >= len(sys.argv): print("load studio command requires studio name argument") sys.exit() arg = sys.argv[index] index += 1 open_options = {} #open_options["option1"] = "asd" #open_options["option2"] = True control_iface.LoadStudio(arg, open_options) elif arg == 'sdel': print("--- studio delete") if index >= len(sys.argv): print("delete studio command requires studio name argument") sys.exit() arg = sys.argv[index] index += 1 control_iface.DeleteStudio(arg) elif arg == 'snew': print("--- studio new") name = "" if index < len(sys.argv): name = sys.argv[index] index += 1 control_iface.NewStudio(name) elif arg == 'sisloaded': print("--- studio is loaded") if control_iface.IsStudioLoaded(): print("yes") else: print("no") elif arg == 'rtlist': print("--- list room templates") for studio in control_iface.GetRoomTemplateList(): name = studio[0] print('"%s"' % name) elif arg == 'rtnew': print("--- create new room template") if index >= len(sys.argv): print("create new room template command requires room template name argument") sys.exit() arg = sys.argv[index] index += 1 control_iface.CreateRoomTemplate(arg) elif arg == 'rtdel': print("--- delete room template") if index >= len(sys.argv): print("delete room template command requires room template name argument") sys.exit() arg = sys.argv[index] index += 1 control_iface.DeleteRoomTemplate(arg) else: if not studio_obj: studio_obj = bus.get_object(service_name, studio_object_path) studio_iface = dbus.Interface(studio_obj, studio_interface_name) if arg == 'sname': print("--- studio get name") print("\"%s\"" % studio_iface.GetName()) elif arg == 'ssave': print("--- studio save") studio_iface.Save() elif arg == 'sunload': print("--- studio unload") studio_iface.Unload() studio_obj = None studio_iface = None elif arg == 'srename': print("--- studio rename") if index >= len(sys.argv): print("rename studio command requires studio name argument") sys.exit() arg = sys.argv[index] index += 1 studio_iface.Rename(arg) elif arg == 'sstart': print("--- studio start") studio_iface.Start() elif arg == 'sstop': print("--- studio stop") studio_iface.Stop() elif arg == 'snewroom': print("--- create new studio room") if index + 1 >= len(sys.argv): print("creation of studio room requires room name and room template name arguments") sys.exit() room_name = sys.argv[index] index += 1 room_template_name = sys.argv[index] index += 1 studio_iface.CreateRoom(room_name, room_template_name) elif arg == 'srlist': print("--- list studio rooms") for room in studio_iface.GetRoomList(): #print repr(room) opath = room[0] name = room[1]["name"] if room[1].has_key("template"): template = str(room[1]["template"]) else: template = None if template: print('"%s" from template "%s" (%s)' % (name, template, opath)) else: print('"%s" (%s)' % (name, opath)) elif arg == 'sdelroom': print("--- delete studio room") if index >= len(sys.argv): print("delete studio room command requires room name argument") sys.exit() arg = sys.argv[index] index += 1 studio_iface.DeleteRoom(arg) elif arg == 'pload': print("--- load project") if index + 1 >= len(sys.argv): print("load project command requires room name and project dir arguments") sys.exit() room_name = sys.argv[index] index += 1 project_dir = sys.argv[index] index += 1 dbus.Interface(get_room_obj_by_name(bus, studio_iface, room_name), room_interface_name).LoadProject(project_dir) elif arg == 'punload': print("--- unload project") if index >= len(sys.argv): print("load project command requires room name argument") sys.exit() room_name = sys.argv[index] index += 1 dbus.Interface(get_room_obj_by_name(bus, studio_iface, room_name), room_interface_name).UnloadProject() elif arg == 'psave': print("--- save project") if index >= len(sys.argv): print("save project command requires room name argument") sys.exit() room_name = sys.argv[index] index += 1 dbus.Interface(get_room_obj_by_name(bus, studio_iface, room_name), room_interface_name).SaveProject("", "") elif arg == 'psaveas': print("--- save project as") if index + 2 >= len(sys.argv): print("save project as command requires room name, project dir and project name arguments") sys.exit() room_name = sys.argv[index] index += 1 project_dir = sys.argv[index] index += 1 project_name = sys.argv[index] index += 1 dbus.Interface(get_room_obj_by_name(bus, studio_iface, room_name), room_interface_name).SaveProject(project_dir, project_name) elif arg == 'snewapp': print("--- new studio app") count, cmdline, name, level, term = parse_new_app_args(sys.argv[index:]) index += count add_app(studio_obj, cmdline, name, level, term) elif arg == 'rnewapp': print("--- new room app") arg = sys.argv[index] index += 1 count, cmdline, name, level, term = parse_new_app_args(sys.argv[index:]) index += count add_app(get_room_obj_by_name(bus, studio_iface, arg), cmdline, name, level, term) elif arg == "sgdump": print('--- dump studio graph') dump_graph(studio_obj) elif arg == "rgdump": if index >= len(sys.argv): print("rgdump command requires room name argument") sys.exit() rname = sys.argv[index] index += 1 print('--- dump room "' + rname + '" graph') room_obj = get_room_obj_by_name(bus, studio_obj, rname) dump_graph(room_obj) elif arg == "sconnect": if index + 4 > len(sys.argv): print("sconnect command requires client1, port1, client2 and port2 arguments") sys.exit() c1 = sys.argv[index] p1 = sys.argv[index + 1] c2 = sys.argv[index + 2] p2 = sys.argv[index + 3] index += 4 print('--- connect studio port "' + c1 + '":"' + p1 + '" to "' + c2 + '":"' + p2 + '"') patchbay_iface = dbus.Interface(studio_obj, patchbay_interface_name) patchbay_iface.ConnectPortsByName(c1, p1, c2, p2) elif arg == "sdisconnect": if index + 4 > len(sys.argv): print("sdisconnect command requires client1, port1, client2 and port2 arguments") sys.exit() c1 = sys.argv[index] p1 = sys.argv[index + 1] c2 = sys.argv[index + 2] p2 = sys.argv[index + 3] index += 4 print('--- disconnect studio port "' + c2 + '":"' + p2 + '" from "' + c1 + '":"' + p1 + '"') patchbay_iface = dbus.Interface(studio_obj, patchbay_interface_name) patchbay_iface.DisconnectPortsByName(c1, p1, c2, p2) elif arg == "rconnect": if index + 5 > len(sys.argv): print("rconnect command requires rname, client1, port1, client2 and port2 arguments") sys.exit() rname = sys.argv[index] c1 = sys.argv[index + 1] p1 = sys.argv[index + 2] c2 = sys.argv[index + 3] p2 = sys.argv[index + 4] index += 5 print('--- connect room "' + rname + '" port "' + c1 + '":"' + p1 + '" to "' + c2 + '":"' + p2 + '"') room_obj = get_room_obj_by_name(bus, studio_iface, rname) patchbay_iface = dbus.Interface(room_obj, patchbay_interface_name) patchbay_iface.ConnectPortsByName(c1, p1, c2, p2) elif arg == "rdisconnect": if index + 5 > len(sys.argv): print("rdisconnect command requires rname, client1, port1, client2 and port2 arguments") sys.exit() rname = sys.argv[index] c1 = sys.argv[index + 1] p1 = sys.argv[index + 2] c2 = sys.argv[index + 3] p2 = sys.argv[index + 4] index += 5 print('--- disconnect room "' + rname + '" port "' + c2 + '":"' + p2 + '" from "' + c1 + '":"' + p1 + '"') room_obj = get_room_obj_by_name(bus, studio_iface, rname) patchbay_iface = dbus.Interface(room_obj, patchbay_interface_name) patchbay_iface.DisconnectPortsByName(c1, p1, c2, p2) else: print("Unknown command '%s'" % arg) except dbus.DBusException, e: print("DBus exception: %s" % str(e)) if __name__ == '__main__': main()