#!/usr/bin/python # LADITools - Linux Audio Desktop Integration Tools # g15ladi - A jack monitor for the g15 keyboard that uses the dbus interface # Copyright (C) 2007-2010, Marc-Olivier Barre # Copyright (C) 2007-2009, Nedko Arnaudov # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import sys import os import time import signal try: import imp imp.find_module('laditools') except ImportError: # Running from the build tree? sys.path.insert(0, os.path.join(sys.path[0], os.pardir)) import laditools pipe_filename = "/dev/shm/g15log.pipe" class g15composer: text_justify_left = 0 text_justify_center = 1 text_justify_right = 2 text_size_small = 0 #text_size_medium = 1 text_size_large = 2 width = 160 height = 43 def __init__(self, pipe_filename, large): if large: self.size = self.text_size_large self.max_lines = 5 # 7 for small, 5 for large self.max_chars = 20 else: self.size = self.text_size_small self.max_lines = 7 self.max_chars = 40 self.pipe_filename = pipe_filename self.pid = os.spawnlp(os.P_NOWAIT, 'g15composer', 'g15composer', self.pipe_filename) #print "g15composer started" while not os.access(self.pipe_filename, os.F_OK): #print "Waiting pipe to appear..." time.sleep(1) self.pipe = file(self.pipe_filename, "w") os.remove(self.pipe_filename) #print "pipe unlinked" def __del__(self): self.pipe.close() #print "stopping g15composer..." os.kill(self.pid, signal.SIGTERM) os.waitpid(self.pid, 0) #print "g15composer stopped" def send_message(self, message): #print message, self.pipe.write(message) self.pipe.flush() def draw_lines(self, lines, x, y, justify): message = 'TO %u %u %u %u' % (x, y, self.size, justify) for line in lines: message += ' "%s "' % line.replace('"', "'").strip("\n") message += "\n" self.send_message(message) def draw_lines(self, lines, x, y, justify): message = 'TO %u %u %u %u' % (x, y, self.size, justify) for line in lines: message += ' "%s "' % line.replace('"', "'").strip("\n") message += "\n" self.send_message(message) def clear(self, ink): if ink: message = 'PC 1\n' else: message = 'PC 0\n' self.send_message(message) def draw_bar(self, x, y, width, height, ink, fill): if width < 0: width = self.width - width - x - 1 else: if width < 5: width = 5 if height < 3: height = 3 width -= 1 height -= 3 y += 1 if ink: ink = 1 else: ink = 0 self.send_message("DB %u %u %u %u %u %u %u 1\n" % (x, y, x + width, y + height, ink, fill * 1000, 1000)) class jackctl_g15: def __init__(self): self.jack = None self.g15 = g15composer(pipe_filename, False) self.started = False self.ladish = None def get_jack_info(self): if not self.jack: self.jack = laditools.jack_controller() if not self.jack.is_started(): return False, None, None, None, None, None realtime = self.jack.is_realtime() load = self.jack.get_load() xruns = self.jack.get_xruns() if xruns == 0: xruns_line = "no xruns" elif xruns == 1: xruns_line = "1 xrun" else: xruns_line = "%u xruns" % xruns rate = self.jack.get_sample_rate() if rate % 1000.0 == 0: rate_line = "Sample rate: %.0f Khz" % (rate / 1000.0) else: rate_line = "Sample rate: %.1f Khz" % (rate / 1000.0) latency_line = "Latency: %.1f ms" % self.jack.get_latency() return True, realtime, load, xruns_line, rate_line, latency_line def update(self): try: studio_name = None if not self.ladish: try: self.ladish = laditools.ladish_proxy() except: pass try: studio_name = self.ladish.studio_name() except: pass started, realtime, load, xruns_line, rate_line, latency_line = self.get_jack_info() if started: self.started = True elif self.started: self.g15.clear(False) self.started = False if self.ladish: if studio_name: status_line = '"' + studio_name + '"' if started: status_line += " started" if realtime: status_line += " (RT)" else: status_line += " stopped" else: if started: status_line = "ladish is sick, jack started w/o studio" else: status_line = "No studio loaded" else: status_line = "JACK" if started: if realtime: status_line += " started in realtime mode" else: status_line += " started in non-realtime mode" else: status_line += " stopped" except Exception as e: print(repr(e)) status_line = "JACK cannot be connected" if self.jack: self.g15.clear(False) self.jack = None self.started = False self.g15.draw_lines([status_line], 0, 0, self.g15.text_justify_left) if self.started: self.g15.draw_lines(["DSP load"], 0, 12, self.g15.text_justify_left) self.g15.draw_bar(35, 12, 95, 5, True, load / 100.0) self.g15.draw_lines(["%.3f%%" % load], 135, 12, self.g15.text_justify_left) self.g15.draw_lines([xruns_line], 0, 22, self.g15.text_justify_left) self.g15.draw_lines([rate_line, latency_line], 0, 31, self.g15.text_justify_left) def run(self): while True: self.update() time.sleep(1) del self.g15 jackctl_g15().run()