LADI
/
spa
1
Fork 0

Compare commits

...

10 Commits

Author SHA1 Message Date
Pauli Virtanen 7aae8c45ec bluez5: do not delay transport release if it is not active
Transport release should not be delayed if it is not active, since the
fd cannot be used any more, and the transport needs to be reacquired to
get a working fd.

Fixes reacquiring transports if the remote side causes them to become
inactive.
2023-05-26 07:17:51 +00:00
Wim Taymans 2ee7996cf7 impl-node: improve stats
The signal_time of the driver is supposed to be the time when the
driver started, not when it was signaled the last time to complete
the graph. Remember the start time and override the signal time when
the graph completes.
2023-05-25 20:22:49 +02:00
Wim Taymans 3d68c7124d impl-node: drivers run remotely now
Remove some code now that drivers also run remotely.
2023-05-25 20:06:38 +02:00
Wim Taymans 09f480ccb3 audioconvert: fix monitor port latency
The monitor port latency is the reverse direction of the input ports.
2023-05-25 20:05:54 +02:00
Wim Taymans a5b845650e remote-node: only signal graph end when profiling
Add a flag to the activation to mark the node as being profiled.
Only wake up the eventfd in remote-node when the profiler is running.

This keeps the server sleeping when remote nodes are driving and the
profiler is not running.
2023-05-24 22:40:28 +02:00
Wim Taymans 5fcc0e1181 filter: fix warning 2023-05-24 18:24:06 +02:00
Wim Taymans 8303082024 stream: do fast calls for the process function
We checked it while registering.
2023-05-24 18:01:41 +02:00
Wim Taymans 004d3d900a pulse-server: improve debug 2023-05-24 18:01:19 +02:00
Wim Taymans 3605eae6ad impl-node: fix compilation 2023-05-24 17:38:40 +02:00
Wim Taymans 2994c48702 impl-node: only copy relevant fields from target
We can't copy the link or active state.
2023-05-24 14:35:47 +02:00
10 changed files with 106 additions and 48 deletions

View File

@ -2026,8 +2026,13 @@ impl_node_port_enum_params(void *object, int seq,
case SPA_PARAM_Latency:
switch (result.index) {
case 0: case 1:
param = spa_latency_build(&b, id, &this->dir[result.index].latency);
{
uint32_t idx = result.index;
if (port->is_monitor)
idx = idx ^ 1;
param = spa_latency_build(&b, id, &this->dir[idx].latency);
break;
}
default:
return 0;
}

View File

@ -203,6 +203,7 @@ static int spa_bt_transport_stop_volume_timer(struct spa_bt_transport *transport
static int spa_bt_transport_start_volume_timer(struct spa_bt_transport *transport);
static int spa_bt_transport_stop_release_timer(struct spa_bt_transport *transport);
static int spa_bt_transport_start_release_timer(struct spa_bt_transport *transport);
static void spa_bt_transport_commit_release_timer(struct spa_bt_transport *transport);
static int device_start_timer(struct spa_bt_device *device);
static int device_stop_timer(struct spa_bt_device *device);
@ -2589,6 +2590,13 @@ void spa_bt_transport_set_state(struct spa_bt_transport *transport, enum spa_bt_
if (state >= SPA_BT_TRANSPORT_STATE_PENDING && old < SPA_BT_TRANSPORT_STATE_PENDING)
transport_sync_volume(transport);
if (state < SPA_BT_TRANSPORT_STATE_ACTIVE) {
/* If transport becomes inactive, do any pending releases
* immediately, since the fd is not usable any more.
*/
spa_bt_transport_commit_release_timer(transport);
}
if (state == SPA_BT_TRANSPORT_STATE_ERROR) {
uint64_t now = get_time_now(monitor);
@ -2719,6 +2727,27 @@ int spa_bt_transport_acquire(struct spa_bt_transport *transport, bool optional)
return res;
}
static void spa_bt_transport_do_release(struct spa_bt_transport *transport)
{
struct spa_bt_monitor *monitor = transport->monitor;
spa_assert(transport->acquire_refcount >= 1);
spa_assert(transport->acquired);
if (transport->acquire_refcount == 1) {
if (!transport->keepalive) {
spa_bt_transport_impl(transport, release, 0);
transport->acquired = false;
} else {
spa_log_debug(monitor->log, "transport %p: keepalive %s on release",
transport, transport->path);
}
} else {
spa_log_debug(monitor->log, "transport %p: delayed decref %s", transport, transport->path);
}
transport->acquire_refcount -= 1;
}
int spa_bt_transport_release(struct spa_bt_transport *transport)
{
struct spa_bt_monitor *monitor = transport->monitor;
@ -2736,8 +2765,15 @@ int spa_bt_transport_release(struct spa_bt_transport *transport)
spa_assert(transport->acquire_refcount == 1);
spa_assert(transport->acquired);
/* Postpone transport releases, since we might need it again soon */
return spa_bt_transport_start_release_timer(transport);
/* Postpone active transport releases, since we might need it again soon.
* If not active, release now since it has to be reacquired before using again.
*/
if (transport->state == SPA_BT_TRANSPORT_STATE_ACTIVE) {
return spa_bt_transport_start_release_timer(transport);
} else {
spa_bt_transport_do_release(transport);
return 0;
}
}
static int spa_bt_transport_release_now(struct spa_bt_transport *transport)
@ -2808,25 +2844,9 @@ static int stop_timeout_timer(struct spa_bt_monitor *monitor, struct spa_source
static void spa_bt_transport_release_timer_event(struct spa_source *source)
{
struct spa_bt_transport *transport = source->data;
struct spa_bt_monitor *monitor = transport->monitor;
spa_assert(transport->acquire_refcount >= 1);
spa_assert(transport->acquired);
spa_bt_transport_stop_release_timer(transport);
if (transport->acquire_refcount == 1) {
if (!transport->keepalive) {
spa_bt_transport_impl(transport, release, 0);
transport->acquired = false;
} else {
spa_log_debug(monitor->log, "transport %p: keepalive %s on release",
transport, transport->path);
}
} else {
spa_log_debug(monitor->log, "transport %p: delayed decref %s", transport, transport->path);
}
transport->acquire_refcount -= 1;
spa_bt_transport_do_release(transport);
}
static int spa_bt_transport_start_release_timer(struct spa_bt_transport *transport)
@ -2842,6 +2862,17 @@ static int spa_bt_transport_stop_release_timer(struct spa_bt_transport *transpor
return stop_timeout_timer(transport->monitor, &transport->release_timer);
}
static void spa_bt_transport_commit_release_timer(struct spa_bt_transport *transport)
{
struct spa_bt_monitor *monitor = transport->monitor;
/* Do release now if it is pending */
if (transport->release_timer.data) {
spa_log_debug(monitor->log, "transport %p: commit pending release", transport);
spa_bt_transport_release_timer_event(&transport->release_timer);
}
}
static void spa_bt_transport_volume_changed(struct spa_bt_transport *transport)
{
struct spa_bt_monitor *monitor = transport->monitor;

View File

@ -1170,19 +1170,13 @@ static const struct pw_proxy_events proxy_client_node_events = {
.bound_props = client_node_bound_props,
};
static inline uint64_t get_time_ns(struct spa_system *system)
{
struct timespec ts;
spa_system_clock_gettime(system, CLOCK_MONOTONIC, &ts);
return SPA_TIMESPEC_TO_NSEC(&ts);
}
static void context_complete(void *data, struct pw_impl_node *node)
{
struct node_data *d = data;
struct spa_system *data_system = d->data_system;
if (node != d->node || !node->driving)
if (node != d->node || !node->driving ||
!SPA_FLAG_IS_SET(node->rt.target.activation->flags, PW_NODE_ACTIVATION_FLAG_PROFILER))
return;
if (SPA_UNLIKELY(spa_system_eventfd_write(data_system, d->rtwritefd, 1) < 0))

View File

@ -179,11 +179,18 @@ uint32_t stream_pop_missing(struct stream *stream)
missing -= stream->requested;
missing -= avail;
if (missing <= 0)
if (missing <= 0) {
pw_log_debug("stream %p: (tlen:%u - req:%"PRIi64" - avail:%"PRIi64") <= 0",
stream, stream->attr.tlength, stream->requested, avail);
return 0;
}
if (missing < stream->attr.minreq && !stream_prebuf_active(stream, avail))
if (missing < stream->attr.minreq && !stream_prebuf_active(stream, avail)) {
pw_log_debug("stream %p: (tlen:%u - req:%"PRIi64" - avail:%"PRIi64") <= minreq:%u",
stream, stream->attr.tlength, stream->requested, avail,
stream->attr.minreq);
return 0;
}
stream->requested += missing;
@ -304,11 +311,12 @@ int stream_send_request(struct stream *stream)
uint32_t size;
size = stream_pop_missing(stream);
pw_log_debug("stream %p: REQUEST channel:%d %u", stream, stream->channel, size);
if (size == 0)
return 0;
pw_log_debug("stream %p: REQUEST channel:%d %u", stream, stream->channel, size);
msg = message_alloc(impl, -1, 0);
message_put(msg,
TAG_U32, COMMAND_REQUEST,

View File

@ -505,6 +505,10 @@ void pw_context_driver_add_listener(struct pw_context *context,
.listener = listener,
.events = events,
.data = data };
struct pw_impl_node *n;
spa_list_for_each(n, &context->driver_list, driver_link) {
SPA_FLAG_SET(n->rt.target.activation->flags, PW_NODE_ACTIVATION_FLAG_PROFILER);
}
pw_loop_invoke(context->data_loop,
do_add_listener, SPA_ID_INVALID, &d, sizeof(d), false, context);
}
@ -521,6 +525,10 @@ SPA_EXPORT
void pw_context_driver_remove_listener(struct pw_context *context,
struct spa_hook *listener)
{
struct pw_impl_node *n;
spa_list_for_each(n, &context->driver_list, driver_link) {
SPA_FLAG_CLEAR(n->rt.target.activation->flags, PW_NODE_ACTIVATION_FLAG_PROFILER);
}
pw_loop_invoke(context->data_loop,
do_remove_listener, SPA_ID_INVALID, NULL, 0, true, listener);
}

View File

@ -966,9 +966,9 @@ do_call_process(struct spa_loop *loop,
static void call_process(struct filter *impl)
{
pw_log_trace("%p: call process", impl);
pw_log_trace_fp("%p: call process", impl);
if (SPA_FLAG_IS_SET(impl->flags, PW_FILTER_FLAG_RT_PROCESS)) {
spa_callbacks_call(&impl->rt_callbacks, struct pw_filter_events,
spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_filter_events,
process, 0, impl->rt.position);
}
else {
@ -1928,17 +1928,16 @@ SPA_EXPORT
struct pw_buffer *pw_filter_dequeue_buffer(void *port_data)
{
struct port *p = SPA_CONTAINER_OF(port_data, struct port, user_data);
struct filter *impl = p->filter;
struct buffer *b;
int res;
if (SPA_UNLIKELY((b = pop_queue(p, &p->dequeued)) == NULL)) {
res = -errno;
pw_log_debug("%p: no more buffers: %m", impl);
pw_log_trace_fp("%p: no more buffers: %m", p->filter);
errno = -res;
return NULL;
}
pw_log_trace_fp("%p: dequeue buffer %d", impl, b->id);
pw_log_trace_fp("%p: dequeue buffer %d", p->filter, b->id);
return &b->this;
}

View File

@ -70,7 +70,7 @@ static struct pw_node_peer *pw_node_peer_ref(struct pw_impl_node *onode, struct
peer->ref = 1;
peer->output = onode;
peer->active_count = 0;
peer->target = inode->rt.target;
copy_target(&peer->target, &inode->rt.target);
spa_list_append(&onode->peer_list, &peer->link);
pw_log_debug("new peer %p from %p to %p", peer, onode, inode);

View File

@ -98,7 +98,7 @@ static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver)
}
/* trigger the driver when we complete */
this->rt.driver_target = driver->rt.target;
copy_target(&this->rt.driver_target, &driver->rt.target);
spa_list_append(&this->rt.target_list, &this->rt.driver_target.link);
/* now increment the required states of all this node targets, including
@ -1177,7 +1177,7 @@ static inline void calculate_stats(struct pw_impl_node *this, struct pw_node_ac
a->cpu_load[2] = (a->cpu_load[2] * 31.0f + load) / 32.0f;
}
pw_log_trace_fp("%p: graph completed wait:%"PRIu64" run:%"PRIu64
" busy:%"PRIu64" period:%"PRIu64" cpu:%f:%f:%f", node,
" busy:%"PRIu64" period:%"PRIu64" cpu:%f:%f:%f", this,
a->awake_time - signal_time,
a->finish_time - a->awake_time,
process_time, period_time,
@ -1242,6 +1242,7 @@ static inline int process_node(void *data)
trigger_targets(this, status, nsec);
} else {
/* calculate CPU time when finished */
a->signal_time = this->driver_start;
calculate_stats(this, a);
pw_context_driver_emit_complete(this->context, this);
}
@ -1819,11 +1820,9 @@ again:
}
a->status = PW_NODE_ACTIVATION_TRIGGERED;
/* remote nodes set the signal_time before writing the ready
* eventfd */
if (!node->remote)
a->signal_time = nsec;
a->prev_signal_time = a->signal_time;
a->signal_time = nsec;
node->driver_start = nsec;
a->sync_timeout = SPA_MIN(min_timeout, DEFAULT_SYNC_TIMEOUT);
@ -1850,9 +1849,7 @@ again:
a->status = PW_NODE_ACTIVATION_FINISHED;
a->finish_time = nsec;
}
if (!node->remote && (status & SPA_STATUS_HAVE_DATA)) {
/* remote nodes have done the output mix already before
* they wrote the ready eventfd */
if (status & SPA_STATUS_HAVE_DATA) {
spa_list_for_each(p, &node->rt.output_mix, rt.node_link)
spa_node_process_fast(p->mix);
}

View File

@ -600,6 +600,16 @@ struct pw_node_target {
unsigned int active:1;
};
static inline void copy_target(struct pw_node_target *dst, const struct pw_node_target *src)
{
dst->id = src->id;
memcpy(dst->name, src->name, sizeof(dst->name));
dst->node = src->node;
dst->activation = src->activation;
dst->system = src->system;
dst->fd = src->fd;
}
struct pw_node_activation {
#define PW_NODE_ACTIVATION_NOT_TRIGGERED 0
#define PW_NODE_ACTIVATION_TRIGGERED 1
@ -625,9 +635,13 @@ struct pw_node_activation {
* used when driver segment_owner has this node id */
/* for drivers, shared with all nodes */
uint32_t segment_owner[32]; /* id of owners for each segment info struct.
uint32_t segment_owner[16]; /* id of owners for each segment info struct.
* nodes that want to update segment info need to
* CAS their node id in this array. */
uint32_t padding[15];
#define PW_NODE_ACTIVATION_FLAG_NONE 0
#define PW_NODE_ACTIVATION_FLAG_PROFILER (1<<0) /* the profiler is running */
uint32_t flags; /* extra flags */
struct spa_io_position position; /* contains current position and segment info.
* extra info is updated by nodes that have set
* themselves as owner in the segment structs */
@ -789,6 +803,8 @@ struct pw_impl_node {
struct spa_fraction target_rate;
uint64_t target_quantum;
uint64_t driver_start;
void *user_data; /**< extra user data */
};

View File

@ -427,7 +427,7 @@ static inline void call_process(struct stream *impl)
if (impl->direction == SPA_DIRECTION_OUTPUT && update_requested(impl) <= 0)
return;
if (impl->process_rt)
spa_callbacks_call(&impl->rt_callbacks, struct pw_stream_events, process, 0);
spa_callbacks_call_fast(&impl->rt_callbacks, struct pw_stream_events, process, 0);
else
pw_loop_invoke(impl->main_loop,
do_call_process, 1, NULL, 0, false, impl);