1
Fork 0

modules: add reconnect support to module-pulse-tunnel

When the pulse connection is broken, reconnect with the given interval.

Add the reconnect_interval_ms property to the pulse modules.
This commit is contained in:
Wim Taymans 2024-01-05 13:34:19 +01:00
parent b4be094be8
commit 3da66734bd
3 changed files with 106 additions and 40 deletions

View File

@ -32,6 +32,7 @@ static const char *const pulse_module_options =
"sink=<name of the remote sink> "
"sink_name=<name for the local sink> "
"sink_properties=<properties for the local sink> "
"reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
"format=<sample format> "
"channels=<number of channels> "
"rate=<sample rate> "
@ -181,6 +182,10 @@ static int module_tunnel_sink_prepare(struct module * const module)
}
audioinfo_to_properties(&info, stream_props);
if ((str = pw_properties_get(props, "reconnect_interval_ms")) != NULL) {
pw_properties_set(props, "reconnect.interval.ms", str);
pw_properties_set(props, "reconnect_interval_ms", NULL);
}
if ((str = pw_properties_get(props, "latency_msec")) != NULL) {
pw_properties_set(props, "pulse.latency", str);
pw_properties_set(props, "latency_msec", NULL);

View File

@ -32,6 +32,7 @@ static const char *const pulse_module_options =
"source=<name of the remote source> "
"source_name=<name for the local source> "
"source_properties=<properties for the local source> "
"reconnect_interval_ms=<interval to try reconnects, 0 or omitted if disabled> "
"format=<sample format> "
"channels=<number of channels> "
"rate=<sample rate> "
@ -178,6 +179,10 @@ static int module_tunnel_source_prepare(struct module * const module)
}
audioinfo_to_properties(&info, stream_props);
if ((str = pw_properties_get(props, "reconnect_interval_ms")) != NULL) {
pw_properties_set(props, "reconnect.interval.ms", str);
pw_properties_set(props, "reconnect_interval_ms", NULL);
}
if ((str = pw_properties_get(props, "latency_msec")) != NULL) {
pw_properties_set(props, "pulse.latency", str);
pw_properties_set(props, "latency_msec", NULL);

View File

@ -58,6 +58,9 @@
* - `pulse.server.address`: the address of the PulseAudio server to tunnel to.
* - `pulse.latency`: the latency to end-to-end latency in milliseconds to
* maintain (Default 200).
* - `reconnect.interval.ms`: when the remote connection is broken, retry to connect
* with this interval in millisconds. A value of 0 disables recovery
* and will result in a module unload. (Default 0) (Since 1.1.0)
* - `stream.props`: Extra properties for the local stream.
*
* ## General options
@ -87,6 +90,7 @@
* # Set the remote address to tunnel to
* pulse.server.address = "tcp:192.168.1.126"
* #pulse.latency = 200
* #reconnect.interval.ms = 0
* #audio.rate=<sample rate>
* #audio.channels=<number of channels>
* #audio.position=<channel map>
@ -121,6 +125,7 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
"( audio.position=<channel map> ] " \
"pulse.server.address=<address> " \
"( pulse.latency=<latency in msec, default 200> ) " \
"( reconnect.interval.ms=<reconnect interval in msec, default 0> ) " \
"( tunnel.mode=source|sink, default sink ) " \
"( stream.props=<properties> ) "
@ -184,9 +189,16 @@ struct impl {
float max_error;
unsigned resync:1;
unsigned int do_disconnect:1;
bool do_disconnect:1;
bool stopping;
struct spa_source *timer;
uint32_t reconnect_interval_ms;
bool recovering;
};
static int start_pulse_connection(struct impl *impl);
static void cork_stream(struct impl *impl, bool cork)
{
pa_operation *operation;
@ -463,8 +475,8 @@ static int create_stream(struct impl *impl)
struct spa_pod_builder b;
struct spa_latency_info latency;
impl->stream = pw_stream_new(impl->core, "pulse", impl->stream_props);
impl->stream_props = NULL;
impl->stream = pw_stream_new(impl->core, "pulse",
pw_properties_copy(impl->stream_props));
if (impl->stream == NULL)
return -errno;
@ -503,19 +515,61 @@ static int create_stream(struct impl *impl)
return 0;
}
static void cleanup_streams(struct impl *impl)
{
if (impl->pa_mainloop) {
pa_threaded_mainloop_stop(impl->pa_mainloop);
pa_threaded_mainloop_lock(impl->pa_mainloop);
}
if (impl->pa_stream) {
pa_stream_unref(impl->pa_stream);
impl->pa_stream = NULL;
}
if (impl->pa_context) {
pa_context_disconnect(impl->pa_context);
pa_context_unref(impl->pa_context);
impl->pa_context = NULL;
}
if (impl->pa_mainloop) {
pa_threaded_mainloop_unlock(impl->pa_mainloop);
pa_threaded_mainloop_free(impl->pa_mainloop);
impl->pa_mainloop = NULL;
}
if (impl->stream)
pw_stream_destroy(impl->stream);
}
static void on_timer_event(void *data, uint64_t expirations)
{
struct impl *impl = data;
cleanup_streams(impl);
start_pulse_connection(impl);
}
static int
do_schedule_destroy(struct spa_loop *loop,
do_schedule_recovery(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct impl *impl = user_data;
if (impl->module)
pw_impl_module_schedule_destroy(impl->module);
if (impl->reconnect_interval_ms > 0) {
struct timespec value;
uint64_t timestamp;
timestamp = impl->reconnect_interval_ms * SPA_NSEC_PER_MSEC;
value.tv_sec = timestamp / SPA_NSEC_PER_SEC;
value.tv_nsec = timestamp % SPA_NSEC_PER_SEC;
pw_loop_update_timer(impl->main_loop, impl->timer, &value, NULL, false);
} else {
if (impl->module)
pw_impl_module_schedule_destroy(impl->module);
}
return 0;
}
static void module_schedule_destroy(struct impl *impl)
static void schedule_recovery(struct impl *impl)
{
pw_loop_invoke(impl->main_loop, do_schedule_destroy, 1, NULL, 0, false, impl);
if (!impl->stopping)
pw_loop_invoke(impl->main_loop, do_schedule_recovery, 1, NULL, 0, false, impl);
}
static int
@ -527,7 +581,8 @@ do_create_stream(struct spa_loop *loop,
if (impl->stream == NULL) {
if ((res = create_stream(impl)) < 0) {
pw_log_error("failed to create stream: %s", spa_strerror(res));
do_schedule_destroy(loop, async, seq, NULL, 0, user_data);
if (impl->module)
pw_impl_module_schedule_destroy(impl->module);
}
}
return 0;
@ -542,22 +597,22 @@ static void stream_state_cb(pa_stream *s, void * userdata)
pw_log_debug("stream state %d", state);
switch (state) {
case PA_STREAM_FAILED:
case PA_STREAM_TERMINATED:
do_destroy = true;
SPA_FALLTHROUGH;
case PA_STREAM_CREATING:
break;
case PA_STREAM_READY:
impl->pa_index = pa_stream_get_index(impl->pa_stream);
pw_loop_invoke(impl->main_loop, do_create_stream, 1, NULL, 0, false, impl);
break;
case PA_STREAM_FAILED:
case PA_STREAM_TERMINATED:
case PA_STREAM_UNCONNECTED:
do_destroy = true;
break;
case PA_STREAM_CREATING:
break;
}
if (do_destroy)
module_schedule_destroy(impl);
if (do_destroy) {
pw_log_warn("stream failure: %d", state);
schedule_recovery(impl);
}
}
static void stream_read_request_cb(pa_stream *s, size_t length, void *userdata)
@ -865,25 +920,25 @@ static void context_state_cb(pa_context *c, void *userdata)
pw_log_debug("state %d", state);
switch (state) {
case PA_CONTEXT_TERMINATED:
case PA_CONTEXT_FAILED:
do_destroy = true;
SPA_FALLTHROUGH;
case PA_CONTEXT_CONNECTING:
case PA_CONTEXT_AUTHORIZING:
case PA_CONTEXT_SETTING_NAME:
break;
case PA_CONTEXT_READY:
if (impl->pa_stream == NULL)
if (create_pulse_stream(impl) < 0)
do_destroy = true;
break;
case PA_CONTEXT_TERMINATED:
case PA_CONTEXT_UNCONNECTED:
case PA_CONTEXT_FAILED:
do_destroy = true;
break;
case PA_CONTEXT_CONNECTING:
case PA_CONTEXT_AUTHORIZING:
case PA_CONTEXT_SETTING_NAME:
break;
}
if (do_destroy)
module_schedule_destroy(impl);
if (do_destroy) {
pw_log_warn("connection failure: %s", pa_strerror(pa_context_errno(c)));
schedule_recovery(impl);
}
}
static pa_proplist* tunnel_new_proplist(struct impl *impl)
@ -975,20 +1030,10 @@ static const struct pw_proxy_events core_proxy_events = {
static void impl_destroy(struct impl *impl)
{
impl->stopping = true;
if (impl->pa_mainloop)
pa_threaded_mainloop_stop(impl->pa_mainloop);
if (impl->pa_stream)
pa_stream_unref(impl->pa_stream);
if (impl->pa_context) {
pa_context_disconnect(impl->pa_context);
pa_context_unref(impl->pa_context);
}
if (impl->pa_mainloop)
pa_threaded_mainloop_free(impl->pa_mainloop);
cleanup_streams(impl);
if (impl->stream)
pw_stream_destroy(impl->stream);
if (impl->core && impl->do_disconnect)
pw_core_disconnect(impl->core);
@ -997,6 +1042,9 @@ static void impl_destroy(struct impl *impl)
pw_properties_free(impl->stream_props);
pw_properties_free(impl->props);
if (impl->timer)
pw_loop_destroy_source(impl->main_loop, impl->timer);
free(impl->buffer);
free(impl);
}
@ -1170,10 +1218,18 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
goto error;
}
}
impl->reconnect_interval_ms = pw_properties_get_uint32(props,
"reconnect.interval.ms", 0);
impl->timer = pw_loop_add_timer(impl->main_loop, on_timer_event, impl);
if (impl->timer == NULL) {
res = -errno;
pw_log_error("can't create timer source: %m");
goto error;
}
impl->latency_msec = pw_properties_get_uint32(props, "pulse.latency", DEFAULT_LATENCY_MSEC);
if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL)
pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true");
if (pw_properties_get(props, PW_KEY_NODE_NETWORK) == NULL)