Factored out Thread (and Slave, an explicitly signal-driven thread)

git-svn-id: http://svn.drobilla.net/lad@87 a436a847-0d15-0410-975c-d299462d15a1
This commit is contained in:
dave 2006-07-12 06:34:30 +00:00
parent 44dd31f131
commit ac122e4106
15 changed files with 391 additions and 309 deletions

View File

@ -27,7 +27,7 @@
* work in GDB. Turns out sem_wait can fail when run in GDB, and Debian
* really needs to update it's man pages.
*
* This class remains as a pretty wrapper/abstraction that does nothing.
* This class remains as a trivial (yet pretty) wrapper/abstraction.
*/
class Semaphore {
public:

View File

@ -35,7 +35,7 @@ public:
virtual ~EventSource() {}
virtual Event* pop_earliest_event_before(const samplecount time) = 0;
virtual Event* pop_earliest_before(const samplecount time) = 0;
virtual void start() = 0;

View File

@ -281,7 +281,7 @@ JackAudioDriver::process_events(jack_nframes_t block_start, jack_nframes_t block
// FIXME
while ((ev = reinterpret_cast<EventSource*>(om->osc_receiver())
->pop_earliest_event_before(block_end)) != NULL) {
->pop_earliest_before(block_end)) != NULL) {
ev->execute(0); // QueuedEvents are not sample accurate
om->post_processor()->push(ev);
if (++num_events_processed > MAX_SLOW_EVENTS)

View File

@ -28,7 +28,7 @@
*
* cleanup() is meant to be called periodically to free memory, often
* enough to prevent the queue from overdflowing. This is done by the
* main thread, in OmApp.
* main thread (in OmApp.cpp) since it has nothing better to do.
*
* \ingroup engine
*/

View File

@ -73,6 +73,9 @@ libingen_la_SOURCES = \
Plugin.h \
Array.h \
List.h \
Slave.h \
Thread.h \
Thread.cpp \
PostProcessor.h \
PostProcessor.cpp \
Connection.h \

View File

@ -52,76 +52,75 @@ using Shared::ClientKey;
OSCReceiver::OSCReceiver(size_t queue_size, const char* const port)
: QueuedEngineInterface(queue_size),
_port(port),
_is_activated(false),
_st(NULL),
_server(NULL),
_osc_responder(NULL)
{
_st = lo_server_thread_new(port, error_cb);
_server = lo_server_new(port, error_cb);
if (_st == NULL) {
if (_server == NULL) {
cerr << "[OSC] Could not start OSC server. Aborting." << endl;
exit(EXIT_FAILURE);
} else {
char* lo_url = lo_server_thread_get_url(_st);
char* lo_url = lo_server_get_url(_server);
cout << "[OSC] Started OSC server at " << lo_url << endl;
free(lo_url);
}
// For debugging, print all incoming OSC messages
lo_server_thread_add_method(_st, NULL, NULL, generic_cb, NULL);
lo_server_add_method(_server, NULL, NULL, generic_cb, NULL);
// Set response address for this message.
// It's important this is first and returns nonzero.
lo_server_thread_add_method(_st, NULL, NULL, set_response_address_cb, this);
lo_server_add_method(_server, NULL, NULL, set_response_address_cb, this);
// Commands
lo_server_thread_add_method(_st, "/om/ping", "i", ping_cb, this);
lo_server_thread_add_method(_st, "/om/ping_slow", "i", ping_slow_cb, this);
lo_server_thread_add_method(_st, "/om/engine/quit", "i", quit_cb, this);
//lo_server_thread_add_method(_st, "/om/engine/register_client", "is", register_client_cb, this);
lo_server_thread_add_method(_st, "/om/engine/register_client", "i", register_client_cb, this);
lo_server_thread_add_method(_st, "/om/engine/unregister_client", "i", unregister_client_cb, this);
lo_server_thread_add_method(_st, "/om/engine/load_plugins", "i", load_plugins_cb, this);
lo_server_thread_add_method(_st, "/om/engine/activate", "i", engine_activate_cb, this);
lo_server_thread_add_method(_st, "/om/engine/deactivate", "i", engine_deactivate_cb, this);
lo_server_thread_add_method(_st, "/om/synth/create_patch", "isi", create_patch_cb, this);
lo_server_thread_add_method(_st, "/om/synth/enable_patch", "is", enable_patch_cb, this);
lo_server_thread_add_method(_st, "/om/synth/disable_patch", "is", disable_patch_cb, this);
lo_server_thread_add_method(_st, "/om/synth/clear_patch", "is", clear_patch_cb, this);
lo_server_thread_add_method(_st, "/om/synth/create_port", "issi", create_port_cb, this);
lo_server_thread_add_method(_st, "/om/synth/create_node", "issssi", create_node_cb, this);
lo_server_thread_add_method(_st, "/om/synth/create_node", "isssi", create_node_by_uri_cb, this);
lo_server_thread_add_method(_st, "/om/synth/destroy", "is", destroy_cb, this);
lo_server_thread_add_method(_st, "/om/synth/rename", "iss", rename_cb, this);
lo_server_thread_add_method(_st, "/om/synth/connect", "iss", connect_cb, this);
lo_server_thread_add_method(_st, "/om/synth/disconnect", "iss", disconnect_cb, this);
lo_server_thread_add_method(_st, "/om/synth/disconnect_all", "is", disconnect_all_cb, this);
lo_server_thread_add_method(_st, "/om/synth/set_port_value", "isf", set_port_value_cb, this);
lo_server_thread_add_method(_st, "/om/synth/set_port_value", "isif", set_port_value_voice_cb, this);
lo_server_thread_add_method(_st, "/om/synth/set_port_value_slow", "isf", set_port_value_slow_cb, this);
lo_server_thread_add_method(_st, "/om/synth/note_on", "isii", note_on_cb, this);
lo_server_thread_add_method(_st, "/om/synth/note_off", "isi", note_off_cb, this);
lo_server_thread_add_method(_st, "/om/synth/all_notes_off", "isi", all_notes_off_cb, this);
lo_server_thread_add_method(_st, "/om/synth/midi_learn", "is", midi_learn_cb, this);
lo_server_add_method(_server, "/om/ping", "i", ping_cb, this);
lo_server_add_method(_server, "/om/ping_slow", "i", ping_slow_cb, this);
lo_server_add_method(_server, "/om/engine/quit", "i", quit_cb, this);
//lo_server_add_method(_server, "/om/engine/register_client", "is", register_client_cb, this);
lo_server_add_method(_server, "/om/engine/register_client", "i", register_client_cb, this);
lo_server_add_method(_server, "/om/engine/unregister_client", "i", unregister_client_cb, this);
lo_server_add_method(_server, "/om/engine/load_plugins", "i", load_plugins_cb, this);
lo_server_add_method(_server, "/om/engine/activate", "i", engine_activate_cb, this);
lo_server_add_method(_server, "/om/engine/deactivate", "i", engine_deactivate_cb, this);
lo_server_add_method(_server, "/om/synth/create_patch", "isi", create_patch_cb, this);
lo_server_add_method(_server, "/om/synth/enable_patch", "is", enable_patch_cb, this);
lo_server_add_method(_server, "/om/synth/disable_patch", "is", disable_patch_cb, this);
lo_server_add_method(_server, "/om/synth/clear_patch", "is", clear_patch_cb, this);
lo_server_add_method(_server, "/om/synth/create_port", "issi", create_port_cb, this);
lo_server_add_method(_server, "/om/synth/create_node", "issssi", create_node_cb, this);
lo_server_add_method(_server, "/om/synth/create_node", "isssi", create_node_by_uri_cb, this);
lo_server_add_method(_server, "/om/synth/destroy", "is", destroy_cb, this);
lo_server_add_method(_server, "/om/synth/rename", "iss", rename_cb, this);
lo_server_add_method(_server, "/om/synth/connect", "iss", connect_cb, this);
lo_server_add_method(_server, "/om/synth/disconnect", "iss", disconnect_cb, this);
lo_server_add_method(_server, "/om/synth/disconnect_all", "is", disconnect_all_cb, this);
lo_server_add_method(_server, "/om/synth/set_port_value", "isf", set_port_value_cb, this);
lo_server_add_method(_server, "/om/synth/set_port_value", "isif", set_port_value_voice_cb, this);
lo_server_add_method(_server, "/om/synth/set_port_value_slow", "isf", set_port_value_slow_cb, this);
lo_server_add_method(_server, "/om/synth/note_on", "isii", note_on_cb, this);
lo_server_add_method(_server, "/om/synth/note_off", "isi", note_off_cb, this);
lo_server_add_method(_server, "/om/synth/all_notes_off", "isi", all_notes_off_cb, this);
lo_server_add_method(_server, "/om/synth/midi_learn", "is", midi_learn_cb, this);
#ifdef HAVE_LASH
lo_server_thread_add_method(_st, "/om/lash/restore_finished", "i", lash_restore_done_cb, this);
lo_server_add_method(_server, "/om/lash/restore_finished", "i", lash_restore_done_cb, this);
#endif
lo_server_thread_add_method(_st, "/om/metadata/request", "isss", metadata_get_cb, this);
lo_server_thread_add_method(_st, "/om/metadata/set", "isss", metadata_set_cb, this);
lo_server_add_method(_server, "/om/metadata/request", "isss", metadata_get_cb, this);
lo_server_add_method(_server, "/om/metadata/set", "isss", metadata_set_cb, this);
// Queries
lo_server_thread_add_method(_st, "/om/request/plugins", "i", request_plugins_cb, this);
lo_server_thread_add_method(_st, "/om/request/all_objects", "i", request_all_objects_cb, this);
lo_server_thread_add_method(_st, "/om/request/port_value", "is", request_port_value_cb, this);
lo_server_add_method(_server, "/om/request/plugins", "i", request_plugins_cb, this);
lo_server_add_method(_server, "/om/request/all_objects", "i", request_all_objects_cb, this);
lo_server_add_method(_server, "/om/request/port_value", "is", request_port_value_cb, this);
// DSSI support
#ifdef HAVE_DSSI
// XXX WARNING: notice this is a catch-all
lo_server_thread_add_method(_st, NULL, NULL, dssi_cb, this);
lo_server_add_method(_server, NULL, NULL, dssi_cb, this);
#endif
lo_server_thread_add_method(_st, NULL, NULL, unknown_cb, NULL);
lo_server_add_method(_server, NULL, NULL, unknown_cb, NULL);
}
@ -129,9 +128,9 @@ OSCReceiver::~OSCReceiver()
{
deactivate();
if (_st != NULL) {
lo_server_thread_free(_st);
_st = NULL;
if (_server != NULL) {
lo_server_free(_server);
_server = NULL;
}
}
@ -139,41 +138,49 @@ OSCReceiver::~OSCReceiver()
void
OSCReceiver::start()
{
set_name("OSCReceiver");
QueuedEventSource::start();
if (!_is_activated) {
lo_server_thread_start(_st);
_is_activated = true;
}
/* Waiting on the next liblo release
pthread_t lo_thread = lo_server_thread_get_thread(_st);
sched_param sp;
sp.sched_priority = 20;
int result = pthread_setschedparam(lo_thread, SCHED_FIFO, &sp);
if (!result)
cout << "[OSC] Set OSC thread to realtime scheduling (SCHED_FIFO, priority "
<< sp.sched_priority << ")" << endl;
else
cout << "[OSC] Unable to set OSC thread to realtime scheduling ("
<< strerror(result) << endl;
*/
set_scheduling(SCHED_FIFO, 10);
}
void
OSCReceiver::stop()
{
if (_is_activated) {
lo_server_thread_stop(_st);
cout << "[OSCReceiver] Stopped OSC server thread" << endl;
_is_activated = false;
}
cout << "[OSCReceiver] Stopped OSC listening thread" << endl;
QueuedEventSource::stop();
}
/** Override the semaphore driven _run method of QueuedEngineInterface
* to wait on OSC messages and prepare them right away in the same thread.
*/
void
OSCReceiver::_run()
{
/* FIXME: Make Event() take a timestamp as a parameter, get a timestamp
* here and stamp all the events with the same time so they all get
* executed in the same cycle */
while (true) {
assert( ! unprepared_events());
// Wait on a message and enqueue it
lo_server_recv(_server);
// Enqueue every other message that is here "now"
// (would this provide truly atomic bundles?)
while (lo_server_recv_noblock(_server, 0) > 0) ;
// Process them all
while (unprepared_events())
_signalled();
// No more unprepared events
}
}
/** Create a new responder for this message, if necessary.
*
* This is based on the fact that the current responder is stored in a ref

View File

@ -43,6 +43,8 @@ inline static int name##_cb(LO_HANDLER_ARGS, void* osc_receiver)\
{ return ((OSCReceiver*)osc_receiver)->m_##name##_cb(path, types, argv, argc, msg); }
/* FIXME: Make this receive and preprocess in the same thread? */
/** Receives OSC messages from liblo.
*
@ -66,7 +68,9 @@ private:
// Prevent copies (undefined)
OSCReceiver(const OSCReceiver&);
OSCReceiver& operator=(const OSCReceiver&);
virtual void _run();
static void error_cb(int num, const char* msg, const char* path);
static int set_response_address_cb(LO_HANDLER_ARGS, void* osc_receiver);
static int generic_cb(LO_HANDLER_ARGS, void* osc_receiver);
@ -112,8 +116,7 @@ private:
#endif
const char* const _port;
bool _is_activated;
lo_server_thread _st;
lo_server _server;
/** Cached OSC responder (for most recent incoming message) */
CountedPtr<OSCResponder> _osc_responder;

View File

@ -162,7 +162,7 @@ ObjectSender::send_port(ClientInterface* client, const Port* port)
if (port->type() == DataType::FLOAT && port->buffer_size() == 1) {
sample default_value = dynamic_cast<const TypedPort<sample>*>(
port)->buffer(0)->value_at(0);
cerr << port->path() << " sending default value " << default_value << endl;
//cerr << port->path() << " sending default value " << default_value << endl;
client->control_change(port->path(), default_value);
}

View File

@ -29,94 +29,28 @@ using std::cerr; using std::cout; using std::endl;
namespace Om {
bool PostProcessor::m_process_thread_exit_flag = false;
PostProcessor::PostProcessor(size_t queue_size)
: m_events(queue_size),
m_thread_exists(false),
m_semaphore(0)
: _events(queue_size)
{
set_name("PostProcessor");
}
PostProcessor::~PostProcessor()
{
stop();
}
/** Start the process thread.
/** Post processing thread.
*
* Infinite loop that waits on the semaphore and processes every enqueued
* event (to be signalled at the end of every process cycle).
*/
void
PostProcessor::start()
PostProcessor::_signalled()
{
cout << "[PostProcessor] Starting." << endl;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 1500000);
pthread_create(&m_process_thread, &attr, process_events, this);
m_thread_exists = true;
}
/** Stop the process thread.
*/
void
PostProcessor::stop()
{
if (m_thread_exists) {
m_process_thread_exit_flag = true;
pthread_cancel(m_process_thread);
pthread_join(m_process_thread, NULL);
m_thread_exists = false;
while ( ! _events.is_empty()) {
Event* const ev = _events.pop();
assert(ev);
ev->post_process();
om->maid()->push(ev);
}
}
/** Signal the PostProcessor to process all pending events.
*/
void
PostProcessor::signal()
{
m_semaphore.post();
}
void*
PostProcessor::process_events(void* osc_processer)
{
PostProcessor* me = (PostProcessor*)osc_processer;
return me->m_process_events();
}
/** OSC message processing thread.
*/
void*
PostProcessor::m_process_events()
{
Event* ev = NULL;
while (true) {
m_semaphore.wait();
if (m_process_thread_exit_flag)
break;
while (!m_events.is_empty()) {
ev = m_events.pop();
assert(ev != NULL);
ev->post_process();
om->maid()->push(ev);
}
}
cout << "[PostProcessor] Exiting post processor thread." << endl;
return NULL;
}
} // namespace Om

View File

@ -21,6 +21,7 @@
#include "types.h"
#include "util/Queue.h"
#include "util/Semaphore.h"
#include "Slave.h"
namespace Om {
@ -35,44 +36,24 @@ class Event;
*
* \ingroup engine
*/
class PostProcessor
class PostProcessor : public Slave
{
public:
PostProcessor(size_t queue_size);
~PostProcessor();
void start();
void stop();
inline void push(Event* const ev);
void signal();
/** Push an event on to the process queue, realtime-safe, not thread-safe. */
inline void push(Event* const ev) { _events.push(ev); }
private:
// Prevent copies
PostProcessor(const PostProcessor&);
PostProcessor& operator=(const PostProcessor&);
Queue<Event*> m_events;
static void* process_events(void* me);
void* m_process_events();
pthread_t m_process_thread;
bool m_thread_exists;
static bool m_process_thread_exit_flag;
Semaphore m_semaphore;
Queue<Event*> _events;
virtual void _signalled();
};
/** Push an event on to the process queue, realtime-safe, not thread-safe.
*/
inline void
PostProcessor::push(Event* const ev)
{
m_events.push(ev);
}
} // namespace Om
#endif // POSTPROCESSOR_H

View File

@ -25,20 +25,15 @@ namespace Om {
QueuedEventSource::QueuedEventSource(size_t size)
: m_front(0),
m_back(0),
m_prepared_back(0),
m_size(size+1),
m_thread_exists(false),
m_prepare_thread_exit_flag(false),
m_semaphore(0)
: _front(0),
_back(0),
_prepared_back(0),
_size(size+1),
_blocking_semaphore(0)
{
m_events = (QueuedEvent**)calloc(m_size, sizeof(QueuedEvent*));
_events = (QueuedEvent**)calloc(_size, sizeof(QueuedEvent*));
pthread_mutex_init(&m_blocking_mutex, NULL);
pthread_cond_init(&m_blocking_cond, NULL);
mlock(m_events, m_size * sizeof(QueuedEvent*));
mlock(_events, _size * sizeof(QueuedEvent*));
}
@ -46,49 +41,7 @@ QueuedEventSource::~QueuedEventSource()
{
stop();
free(m_events);
pthread_mutex_destroy(&m_blocking_mutex);
pthread_cond_destroy(&m_blocking_cond);
}
/** Start the prepare thread.
*/
void
QueuedEventSource::start()
{
if (m_thread_exists) {
cerr << "[QueuedEventSource] Thread already launched?" << endl;
return;
} else {
cout << "[QueuedEventSource] Launching thread." << endl;
}
m_prepare_thread_exit_flag = false;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 1500000);
pthread_create(&m_prepare_thread, &attr, &QueuedEventSource::prepare_loop, this);
pthread_attr_destroy(&attr);
m_thread_exists = true;
}
/** Destroy the prepare thread.
*/
void
QueuedEventSource::stop()
{
if (m_thread_exists) {
m_prepare_thread_exit_flag = true;
pthread_cancel(m_prepare_thread);
pthread_join(m_prepare_thread, NULL);
m_thread_exists = false;
cout << "[QueuedEventSource] Stopped thread." << endl;
}
free(_events);
}
@ -99,13 +52,13 @@ QueuedEventSource::push(QueuedEvent* const ev)
{
assert(!ev->is_prepared());
if (m_events[m_back] != NULL) {
if (_events[_back] != NULL) {
cerr << "[QueuedEventSource] Error: Queue is full! Event is lost, please report!" << endl;
delete ev;
} else {
m_events[m_back] = ev;
m_back = (m_back + 1) % m_size;
m_semaphore.post();
_events[_back] = ev;
_back = (_back + 1) % _size;
signal();
}
}
@ -115,18 +68,18 @@ QueuedEventSource::push(QueuedEvent* const ev)
* This method will only pop events that have been prepared, and are
* stamped before the time passed. In other words, it may return NULL
* even if there are events pending in the queue. The events returned are
* actually QueuedEvent*s, but after this they are "normal" events and the
* actually QueuedEvents, but after this they are "normal" events and the
* engine deals with them just like a realtime in-band event.
*/
Event*
QueuedEventSource::pop_earliest_event_before(const samplecount time)
QueuedEventSource::pop_earliest_before(const samplecount time)
{
QueuedEvent* front_event = m_events[m_front];
QueuedEvent* const front_event = _events[_front];
// Pop
if (front_event != NULL && front_event->time_stamp() < time && front_event->is_prepared()) {
m_events[m_front] = NULL;
m_front = (m_front + 1) % m_size;
if (front_event && front_event->is_prepared() && front_event->time_stamp() < time) {
_events[_front] = NULL;
_front = (_front + 1) % _size;
return front_event;
} else {
return NULL;
@ -137,63 +90,40 @@ QueuedEventSource::pop_earliest_event_before(const samplecount time)
// Private //
/** Signal that the blocking event is finished.
*
* When this is called preparing will resume. This will be called by
* When this is called preparing will resume. This MUST be called by
* blocking events in their post_process() method.
*/
void
QueuedEventSource::unblock()
{
/* FIXME: Make this a semaphore, and have events signal at the end of their
* execute() methods so the preprocessor can start preparing events immediately
* instead of waiting for the postprocessor to get around to finalizing the event? */
pthread_mutex_lock(&m_blocking_mutex);
pthread_cond_signal(&m_blocking_cond);
pthread_mutex_unlock(&m_blocking_mutex);
_blocking_semaphore.post();
}
void*
QueuedEventSource::m_prepare_loop()
void
QueuedEventSource::_signalled()
{
QueuedEvent* ev = NULL;
QueuedEvent* const ev = _events[_prepared_back];
assert(ev != NULL);
while (true) {
m_semaphore.wait();
if (m_prepare_thread_exit_flag)
break; // exit signalled
ev = m_events[m_prepared_back];
assert(ev != NULL);
if (ev == NULL) {
cerr << "[QueuedEventSource] ERROR: Signalled, but event is NULL." << endl;
continue;
}
assert(ev != NULL);
assert(!ev->is_prepared());
if (ev->is_blocking())
pthread_mutex_lock(&m_blocking_mutex);
ev->pre_process();
m_prepared_back = (m_prepared_back+1) % m_size;
// If a blocking event, wait for event to finish passing through
// the audio cycle before preparing the next event
if (ev->is_blocking()) {
pthread_cond_wait(&m_blocking_cond, &m_blocking_mutex);
pthread_mutex_unlock(&m_blocking_mutex);
}
if (ev == NULL) {
cerr << "[QueuedEventSource] ERROR: Signalled, but event is NULL." << endl;
return;
}
cout << "[QueuedEventSource] Exiting slow event queue thread." << endl;
return NULL;
assert(ev != NULL);
assert(!ev->is_prepared());
ev->pre_process();
_prepared_back = (_prepared_back+1) % _size;
// If event was blocking, wait for event to being run through the
// process thread before preparing the next event
if (ev->is_blocking())
_blocking_semaphore.wait();
}

View File

@ -21,6 +21,7 @@
#include <pthread.h>
#include "types.h"
#include "util/Semaphore.h"
#include "Slave.h"
#include "EventSource.h"
namespace Om {
@ -35,22 +36,27 @@ class QueuedEvent;
* popping are threadsafe, as long as a single thread pushes and a single
* thread pops (ie this data structure is threadsafe, but the push and pop
* methods themselves are not).
*
* This class is it's own slave. :)
*/
class QueuedEventSource : public EventSource
class QueuedEventSource : public EventSource, protected Slave
{
public:
QueuedEventSource(size_t size);
~QueuedEventSource();
Event* pop_earliest_event_before(const samplecount time);
void start() { Thread::start(); }
void stop() { Thread::stop(); }
Event* pop_earliest_before(const samplecount time);
void unblock();
void start();
void stop();
protected:
void push(QueuedEvent* const ev);
bool unprepared_events() { return (_prepared_back != _back); }
virtual void _signalled(); ///< Prepare 1 event
private:
// Prevent copies (undefined)
@ -60,21 +66,12 @@ private:
// Note that it's crucially important which functions access which of these
// variables, to maintain threadsafeness.
size_t m_front; ///< Front of queue
size_t m_back; ///< Back of entire queue (1 past index of back element)
size_t m_prepared_back; ///< Back of prepared section (1 past index of back prepared element)
const size_t m_size;
QueuedEvent** m_events;
bool m_thread_exists;
bool m_prepare_thread_exit_flag;
pthread_t m_prepare_thread;
Semaphore m_semaphore; ///< Counting semaphor for driving prepare thread
pthread_mutex_t m_blocking_mutex;
pthread_cond_t m_blocking_cond;
static void* prepare_loop(void* q) { return ((QueuedEventSource*)q)->m_prepare_loop(); }
void* m_prepare_loop();
size_t _front; ///< Front of queue
size_t _back; ///< Back of entire queue (1 past index of back element)
size_t _prepared_back; ///< Back of prepared section (1 past index of back prepared element)
const size_t _size;
QueuedEvent** _events;
Semaphore _blocking_semaphore;
};

View File

@ -0,0 +1,60 @@
/* This file is part of Ingen. Copyright (C) 2006 Dave Robillard.
*
* Om is free software; you can redistribute it and/or modify it under the
* terms of the GNU General Public License as published by the Free Software
* Foundation; either version 2 of the License, or (at your option) any later
* version.
*
* Om is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef SLAVE_H
#define SLAVE_H
#include <pthread.h>
#include "util/Semaphore.h"
#include "Thread.h"
namespace Om {
/** Thread driven by (realtime safe) signals.
*
* \ingroup engine
*/
class Slave : public Thread
{
public:
Slave() : _semaphore(0) {}
inline void signal() { _semaphore.post(); }
protected:
virtual void _signalled() = 0;
Semaphore _semaphore;
private:
// Prevent copies
Slave(const Slave&);
Slave& operator=(const Slave&);
void _run()
{
while (true) {
_semaphore.wait();
_signalled();
}
}
};
} // namespace Om
#endif // SLAVE_H

View File

@ -0,0 +1,104 @@
/* This file is part of Ingen. Copyright (C) 2006 Dave Robillard.
*
* Om is free software; you can redistribute it and/or modify it under the
* terms of the GNU General Public License as published by the Free Software
* Foundation; either version 2 of the License, or (at your option) any later
* version.
*
* Om is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "Thread.h"
#include <cassert>
#include <iostream>
#include <pthread.h>
using std::cerr; using std::cout; using std::endl;
namespace Om {
Thread::Thread()
: _pthread_exists(false)
{
}
Thread::~Thread()
{
stop();
}
/** Start the process thread.
*/
void
Thread::start()
{
cout << "[" << _name << " Thread] Starting." << endl;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 1500000);
pthread_create(&_pthread, &attr, _static_run, this);
_pthread_exists = true;
}
/** Stop the process thread.
*/
void
Thread::stop()
{
if (_pthread_exists) {
pthread_cancel(_pthread);
pthread_join(_pthread, NULL);
_pthread_exists = false;
}
}
/** Set the scheduling policy for this thread.
*
* @param must be one of SCHED_FIFO, SCHED_RR, or SCHED_OTHER.
*/
void
Thread::set_scheduling(int policy, unsigned int priority)
{
sched_param sp;
sp.sched_priority = priority;
int result = pthread_setschedparam(_pthread, SCHED_FIFO, &sp);
if (!result) {
cout << "[" << _name << " Thread] Set scheduling policy to ";
switch (policy) {
case SCHED_FIFO: cout << "SCHED_FIFO"; break;
case SCHED_RR: cout << "SCHED_RR"; break;
case SCHED_OTHER: cout << "SCHED_OTHER"; break;
default: cout << "UNKNOWN"; break;
}
cout << ", priority " << sp.sched_priority << endl;
} else {
cout << "[" << _name << " Thread] Unable to set scheduling policy ("
<< strerror(result) << ")" << endl;
}
}
void*
Thread::_static_run(void* me)
{
Thread* myself = (Thread*)me;
myself->_run();
// and I
return NULL;
}
} // namespace Om

View File

@ -0,0 +1,63 @@
/* This file is part of Ingen. Copyright (C) 2006 Dave Robillard.
*
* Om is free software; you can redistribute it and/or modify it under the
* terms of the GNU General Public License as published by the Free Software
* Foundation; either version 2 of the License, or (at your option) any later
* version.
*
* Om is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef THREAD_H
#define THREAD_H
#include <string>
#include <pthread.h>
namespace Om {
/* FIXME: This isn't Ingen specific at all. Move it to util. */
/** Abstract base class for all threads.
*
* \ingroup engine
*/
class Thread
{
public:
Thread();
virtual ~Thread();
virtual void start();
virtual void stop();
void set_name(const std::string& name) { _name = name; }
void set_scheduling(int policy, unsigned int priority);
protected:
virtual void _run() = 0;
std::string _name;
pthread_t _pthread;
bool _pthread_exists;
private:
// Prevent copies
Thread(const Thread&);
Thread& operator=(const Thread&);
static void* _static_run(void* me);
};
} // namespace Om
#endif // THREAD_H