From 345582dd15c516e403c4255db281c452a333a92b Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 10 Mar 2023 17:39:51 +0100 Subject: [PATCH] module-rtp: add opus encoding --- meson.build | 3 + meson_options.txt | 4 + spa/include/spa/param/audio/format.h | 2 + spa/include/spa/param/audio/opus.h | 29 +++ spa/include/spa/param/format-types.h | 1 + spa/include/spa/param/format.h | 1 + spa/meson.build | 4 +- src/modules/meson.build | 6 +- src/modules/module-rtp/opus.c | 369 +++++++++++++++++++++++++++ src/modules/module-rtp/stream.c | 37 +++ 10 files changed, 452 insertions(+), 4 deletions(-) create mode 100644 spa/include/spa/param/audio/opus.h create mode 100644 src/modules/module-rtp/opus.c diff --git a/meson.build b/meson.build index 8d7363e83..095e568b4 100644 --- a/meson.build +++ b/meson.build @@ -282,6 +282,9 @@ else endif cdata.set('HAVE_PW_CAT_FFMPEG_INTEGRATION', pw_cat_ffmpeg.allowed()) +opus_dep = dependency('opus', required : get_option('opus')) +summary({'opus (Bluetooth, RTP)': opus_dep.found()}, bool_yn: true, section: 'Misc dependencies') + summary({'readline (for pw-cli)': readline_dep.found()}, bool_yn: true, section: 'Misc dependencies') cdata.set('HAVE_READLINE', readline_dep.found()) ncurses_dep = dependency('ncursesw', required : false) diff --git a/meson_options.txt b/meson_options.txt index e273deab6..9ca5cf821 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -318,3 +318,7 @@ option('rlimits-nice', min: -20, max: -1, value: -19) +option('opus', + description: 'Enable code that depends on opus', + type: 'feature', + value: 'auto') diff --git a/spa/include/spa/param/audio/format.h b/spa/include/spa/param/audio/format.h index 1b112280b..0619de394 100644 --- a/spa/include/spa/param/audio/format.h +++ b/spa/include/spa/param/audio/format.h @@ -28,6 +28,7 @@ extern "C" { #include #include #include +#include struct spa_audio_info { uint32_t media_type; @@ -46,6 +47,7 @@ struct spa_audio_info { struct spa_audio_info_alac alac; struct spa_audio_info_flac flac; struct spa_audio_info_ape ape; + struct spa_audio_info_ape opus; } info; }; diff --git a/spa/include/spa/param/audio/opus.h b/spa/include/spa/param/audio/opus.h new file mode 100644 index 000000000..f27dc6306 --- /dev/null +++ b/spa/include/spa/param/audio/opus.h @@ -0,0 +1,29 @@ +/* Simple Plugin API */ +/* SPDX-FileCopyrightText: Copyright © 2023 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#ifndef SPA_AUDIO_OPUS_H +#define SPA_AUDIO_OPUS_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +struct spa_audio_info_opus { + uint32_t rate; /*< sample rate */ + uint32_t channels; /*< number of channels */ +}; + +#define SPA_AUDIO_INFO_OPUS_INIT(...) ((struct spa_audio_info_opus) { __VA_ARGS__ }) + +/** + * \} + */ + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* SPA_AUDIO_OPUS_H */ diff --git a/spa/include/spa/param/format-types.h b/spa/include/spa/param/format-types.h index 4f0c2af79..b7f52d71d 100644 --- a/spa/include/spa/param/format-types.h +++ b/spa/include/spa/param/format-types.h @@ -63,6 +63,7 @@ static const struct spa_type_info spa_type_media_subtype[] = { { SPA_MEDIA_SUBTYPE_alac, SPA_TYPE_Int, SPA_TYPE_INFO_MEDIA_SUBTYPE_BASE "alac", NULL }, { SPA_MEDIA_SUBTYPE_flac, SPA_TYPE_Int, SPA_TYPE_INFO_MEDIA_SUBTYPE_BASE "flac", NULL }, { SPA_MEDIA_SUBTYPE_ape, SPA_TYPE_Int, SPA_TYPE_INFO_MEDIA_SUBTYPE_BASE "ape", NULL }, + { SPA_MEDIA_SUBTYPE_opus, SPA_TYPE_Int, SPA_TYPE_INFO_MEDIA_SUBTYPE_BASE "opus", NULL }, /* video subtypes */ { SPA_MEDIA_SUBTYPE_h264, SPA_TYPE_Int, SPA_TYPE_INFO_MEDIA_SUBTYPE_BASE "h264", NULL }, { SPA_MEDIA_SUBTYPE_mjpg, SPA_TYPE_Int, SPA_TYPE_INFO_MEDIA_SUBTYPE_BASE "mjpg", NULL }, diff --git a/spa/include/spa/param/format.h b/spa/include/spa/param/format.h index 41d59d2a7..3f075fd36 100644 --- a/spa/include/spa/param/format.h +++ b/spa/include/spa/param/format.h @@ -51,6 +51,7 @@ enum spa_media_subtype { SPA_MEDIA_SUBTYPE_alac, /** since 0.3.65 */ SPA_MEDIA_SUBTYPE_flac, /** since 0.3.65 */ SPA_MEDIA_SUBTYPE_ape, /** since 0.3.65 */ + SPA_MEDIA_SUBTYPE_opus, /** since 0.3.68 */ SPA_MEDIA_SUBTYPE_START_Video = 0x20000, SPA_MEDIA_SUBTYPE_h264, diff --git a/spa/meson.build b/spa/meson.build index 521d0382c..1cf7db0e7 100644 --- a/spa/meson.build +++ b/spa/meson.build @@ -74,7 +74,9 @@ if get_option('spa-plugins').allowed() endif endif summary({'LC3plus': lc3plus_dep.found()}, bool_yn: true, section: 'Bluetooth audio codecs') - opus_dep = dependency('opus', required : get_option('bluez5-codec-opus')) + if get_option('bluez5-codec-opus').enabled() and not opus_dep.found() + error('bluez5-codec-opus enabled, but opus dependency not found') + endif summary({'Opus': opus_dep.found()}, bool_yn: true, section: 'Bluetooth audio codecs') lc3_dep = dependency('lc3', required : get_option('bluez5-codec-lc3')) summary({'LC3': lc3_dep.found()}, bool_yn: true, section: 'Bluetooth audio codecs') diff --git a/src/modules/meson.build b/src/modules/meson.build index 967ec2fed..5eedf1978 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -525,7 +525,7 @@ pipewire_module_rtp_source = shared_library('pipewire-module-rtp-source', install : true, install_dir : modules_install_dir, install_rpath: modules_install_dir, - dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep], + dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, opus_dep], ) pipewire_module_rtp_sink = shared_library('pipewire-module-rtp-sink', @@ -535,7 +535,7 @@ pipewire_module_rtp_sink = shared_library('pipewire-module-rtp-sink', install : true, install_dir : modules_install_dir, install_rpath: modules_install_dir, - dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep], + dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, opus_dep], ) build_module_rtp_session = avahi_dep.found() @@ -548,7 +548,7 @@ if build_module_rtp_session install : true, install_dir : modules_install_dir, install_rpath: modules_install_dir, - dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, avahi_dep], + dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, avahi_dep, opus_dep], ) endif diff --git a/src/modules/module-rtp/opus.c b/src/modules/module-rtp/opus.c new file mode 100644 index 000000000..370a12388 --- /dev/null +++ b/src/modules/module-rtp/opus.c @@ -0,0 +1,369 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2023 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#include +#include + +struct rtp_opus_data { +}; + +static void rtp_opus_process_playback(void *data) +{ + struct impl *impl = data; + struct pw_buffer *buf; + struct spa_data *d; + uint32_t wanted, timestamp, target_buffer, stride, maxsize; + int32_t avail; + + if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { + pw_log_debug("Out of stream buffers: %m"); + return; + } + d = buf->buffer->datas; + + stride = impl->stride; + + maxsize = d[0].maxsize / stride; + wanted = buf->requested ? SPA_MIN(buf->requested, maxsize) : maxsize; + + if (impl->io_position && impl->direct_timestamp) { + /* in direct mode, read directly from the timestamp index, + * because sender and receiver are in sync, this would keep + * target_buffer of samples available. */ + spa_ringbuffer_read_update(&impl->ring, + impl->io_position->clock.position); + } + avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp); + + target_buffer = impl->target_buffer; + + if (avail < (int32_t)wanted) { + enum spa_log_level level; + memset(d[0].data, 0, wanted * stride); + if (impl->have_sync) { + impl->have_sync = false; + level = SPA_LOG_LEVEL_WARN; + } else { + level = SPA_LOG_LEVEL_DEBUG; + } + pw_log(level, "underrun %d/%u < %u", + avail, target_buffer, wanted); + } else { + float error, corr; + if (impl->first) { + if ((uint32_t)avail > target_buffer) { + uint32_t skip = avail - target_buffer; + pw_log_debug("first: avail:%d skip:%u target:%u", + avail, skip, target_buffer); + timestamp += skip; + avail = target_buffer; + } + impl->first = false; + } else if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE2 / stride)) { + pw_log_warn("overrun %u > %u", avail, target_buffer * 8); + timestamp += avail - target_buffer; + avail = target_buffer; + } + if (!impl->direct_timestamp) { + /* when not using direct timestamp and clocks are not + * in sync, try to adjust our playback rate to keep the + * requested target_buffer bytes in the ringbuffer */ + error = (float)target_buffer - (float)avail; + error = SPA_CLAMP(error, -impl->max_error, impl->max_error); + + corr = spa_dll_update(&impl->dll, error); + + pw_log_debug("avail:%u target:%u error:%f corr:%f", avail, + target_buffer, error, corr); + + if (impl->io_rate_match) { + SPA_FLAG_SET(impl->io_rate_match->flags, + SPA_IO_RATE_MATCH_FLAG_ACTIVE); + impl->io_rate_match->rate = 1.0f / corr; + } + } + spa_ringbuffer_read_data(&impl->ring, + impl->buffer, + BUFFER_SIZE2, + (timestamp * stride) & BUFFER_MASK2, + d[0].data, wanted * stride); + + timestamp += wanted; + spa_ringbuffer_read_update(&impl->ring, timestamp); + } + d[0].chunk->size = wanted * stride; + d[0].chunk->stride = stride; + d[0].chunk->offset = 0; + buf->size = wanted; + + pw_stream_queue_buffer(impl->stream, buf); +} + +static int rtp_opus_receive(struct impl *impl, uint8_t *buffer, ssize_t len) +{ + struct rtp_header *hdr; + ssize_t hlen, plen; + uint16_t seq; + uint32_t timestamp, samples, write, expected_write; + uint32_t stride = impl->stride; + OpusMSDecoder *dec = impl->stream_data; + int32_t filled; + int res; + + if (len < 12) + goto short_packet; + + hdr = (struct rtp_header*)buffer; + if (hdr->v != 2) + goto invalid_version; + + hlen = 12 + hdr->cc * 4; + if (hlen > len) + goto invalid_len; + + if (impl->have_ssrc && impl->ssrc != hdr->ssrc) + goto unexpected_ssrc; + impl->ssrc = hdr->ssrc; + impl->have_ssrc = true; + + seq = ntohs(hdr->sequence_number); + if (impl->have_seq && impl->seq != seq) { + pw_log_info("unexpected seq (%d != %d) SSRC:%u", + seq, impl->seq, hdr->ssrc); + impl->have_sync = false; + } + impl->seq = seq + 1; + impl->have_seq = true; + + timestamp = ntohl(hdr->timestamp) - impl->ts_offset; + + impl->receiving = true; + + plen = len - hlen; + + filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_write); + + /* we always write to timestamp + delay */ + write = timestamp + impl->target_buffer; + + if (!impl->have_sync) { + pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u target:%u direct:%u", + timestamp, seq, impl->ts_offset, impl->ssrc, + impl->target_buffer, impl->direct_timestamp); + + /* we read from timestamp, keeping target_buffer of data + * in the ringbuffer. */ + impl->ring.readindex = timestamp; + impl->ring.writeindex = write; + filled = impl->target_buffer; + + spa_dll_init(&impl->dll); + spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); + memset(impl->buffer, 0, BUFFER_SIZE); + impl->have_sync = true; + } else if (expected_write != write) { + pw_log_debug("unexpected write (%u != %u)", + write, expected_write); + } + + if (filled + plen > BUFFER_SIZE2 / stride) { + pw_log_debug("capture overrun %u + %zd > %u", filled, plen, + BUFFER_SIZE2 / stride); + impl->have_sync = false; + } else { + uint32_t index = (write * stride) & BUFFER_MASK2, end; + + res = opus_multistream_decode_float(dec, + &buffer[hlen], plen, + (float*)&impl->buffer[index], 2880, + 0); + + end = index + (res * stride); + /* fold to the lower part of the ringbuffer when overflow */ + if (end > BUFFER_SIZE2) + memmove(impl->buffer, &impl->buffer[BUFFER_SIZE2], end - BUFFER_SIZE2); + + pw_log_debug("receiving %zd len:%d timestamp:%d %u", plen, res, timestamp, index); + samples = res; + + write += samples; + spa_ringbuffer_write_update(&impl->ring, write); + } + return 0; + +short_packet: + pw_log_warn("short packet received"); + return -EINVAL; +invalid_version: + pw_log_warn("invalid RTP version"); + spa_debug_mem(0, buffer, len); + return -EPROTO; +invalid_len: + pw_log_warn("invalid RTP length"); + return -EINVAL; +unexpected_ssrc: + pw_log_warn("unexpected SSRC (expected %u != %u)", + impl->ssrc, hdr->ssrc); + return -EINVAL; +} + +static void rtp_opus_flush_packets(struct impl *impl) +{ + int32_t avail, tosend; + uint32_t stride, timestamp, offset; + uint8_t out[1280]; + struct iovec iov[2]; + struct rtp_header header; + OpusMSEncoder *enc = impl->stream_data; + int res = 0; + + avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp); + tosend = impl->psamples; + + if (avail < tosend) + return; + + stride = impl->stride; + + spa_zero(header); + header.v = 2; + header.pt = impl->payload; + header.ssrc = htonl(impl->ssrc); + + iov[0].iov_base = &header; + iov[0].iov_len = sizeof(header); + iov[1].iov_base = out; + iov[1].iov_len = 0; + + offset = 0; + while (avail >= tosend) { + header.sequence_number = htons(impl->seq); + header.timestamp = htonl(impl->ts_offset + timestamp); + + res = opus_multistream_encode_float(enc, + (const float*)&impl->buffer[offset * stride], tosend, + out, sizeof(out)); + + pw_log_debug("sending %d len:%d timestamp:%d", tosend, res, timestamp); + iov[1].iov_len = res; + + rtp_stream_emit_send_packet(impl, iov, 2); + + impl->seq++; + timestamp += tosend; + offset += tosend; + avail -= tosend; + } + + pw_log_debug("move %d offset:%d", avail, offset); + memmove(impl->buffer, &impl->buffer[offset * stride], avail * stride); + + spa_ringbuffer_read_update(&impl->ring, timestamp); +} + +static void rtp_opus_process_capture(void *data) +{ + struct impl *impl = data; + struct pw_buffer *buf; + struct spa_data *d; + uint32_t offs, size, timestamp, expected_timestamp, stride; + int32_t filled, wanted; + + if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { + pw_log_debug("Out of stream buffers: %m"); + return; + } + d = buf->buffer->datas; + + offs = SPA_MIN(d[0].chunk->offset, d[0].maxsize); + size = SPA_MIN(d[0].chunk->size, d[0].maxsize - offs); + stride = impl->stride; + wanted = size / stride; + + filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_timestamp); + + if (SPA_LIKELY(impl->io_position)) { + uint32_t rate = impl->io_position->clock.rate.denom; + timestamp = impl->io_position->clock.position * impl->rate / rate; + } else + timestamp = expected_timestamp; + + if (impl->have_sync) { + if (SPA_ABS((int32_t)expected_timestamp - (int32_t)timestamp) > 32) { + pw_log_warn("expected %u != timestamp %u", expected_timestamp, timestamp); + impl->have_sync = false; + } else if (filled + wanted > (int32_t)(BUFFER_SIZE / stride)) { + pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE / stride); + impl->have_sync = false; + } + } + if (!impl->have_sync) { + pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u", + timestamp, impl->seq, impl->ts_offset, impl->ssrc); + impl->ring.readindex = impl->ring.writeindex = timestamp; + memset(impl->buffer, 0, BUFFER_SIZE); + impl->have_sync = true; + } + + spa_ringbuffer_write_data(&impl->ring, + impl->buffer, + BUFFER_SIZE, + (filled * stride) & BUFFER_MASK, + SPA_PTROFF(d[0].data, offs, void), wanted * stride); + timestamp += wanted; + spa_ringbuffer_write_update(&impl->ring, timestamp); + + pw_stream_queue_buffer(impl->stream, buf); + + rtp_opus_flush_packets(impl); +} + +static int rtp_opus_init(struct impl *impl, enum spa_direction direction) +{ + int err; + unsigned char mapping[64]; + uint32_t i; + + if (impl->psamples >= 2880) + impl->psamples = 2880; + else if (impl->psamples >= 1920) + impl->psamples = 1920; + else if (impl->psamples >= 960) + impl->psamples = 960; + else if (impl->psamples >= 480) + impl->psamples = 480; + else if (impl->psamples >= 240) + impl->psamples = 240; + else + impl->psamples = 120; + + for (i = 0; i < impl->info.info.opus.channels; i++) + mapping[i] = i; + + impl->receive_rtp = rtp_opus_receive; + if (direction == SPA_DIRECTION_INPUT) { + impl->stream_events.process = rtp_opus_process_capture; + + impl->stream_data = opus_multistream_encoder_create( + impl->info.info.opus.rate, + impl->info.info.opus.channels, + impl->info.info.opus.channels, 0, + mapping, + OPUS_APPLICATION_AUDIO, + &err); + } + else { + impl->stream_events.process = rtp_opus_process_playback; + + impl->stream_data = opus_multistream_decoder_create( + impl->info.info.opus.rate, + impl->info.info.opus.channels, + impl->info.info.opus.channels, 0, + mapping, + &err); + } + if (!impl->stream_data) + pw_log_error("opus error: %d", err); + return impl->stream_data ? 0 : err; +} diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index a474c45d3..35d342b5a 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -5,6 +5,9 @@ #include #include +#include +#include + #include #include #include @@ -46,6 +49,8 @@ struct impl { const struct format_info *format_info; + void *stream_data; + uint32_t rate; uint32_t stride; uint8_t payload; @@ -82,6 +87,7 @@ struct impl { #include "module-rtp/audio.c" #include "module-rtp/midi.c" +#include "module-rtp/opus.c" struct format_info { uint32_t media_subtype; @@ -98,6 +104,7 @@ static const struct format_info audio_format_info[] = { { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S16_BE, 2, "L16", "audio" }, { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S24_BE, 3, "L24", "audio" }, { SPA_MEDIA_SUBTYPE_control, 0, 1, "rtp-midi", "audio" }, + { SPA_MEDIA_SUBTYPE_opus, 0, 4, "opus", "audio" }, }; static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size) @@ -285,6 +292,11 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, impl->info.media_subtype = SPA_MEDIA_SUBTYPE_control; impl->payload = 0x61; } + else if (spa_streq(str, "opus")) { + impl->info.media_type = SPA_MEDIA_TYPE_audio; + impl->info.media_subtype = SPA_MEDIA_SUBTYPE_opus; + impl->payload = 127; + } else { pw_log_error("unsupported media type:%s", str); res = -EINVAL; @@ -319,6 +331,25 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, if (impl->rate == 0) impl->rate = 10000; break; + case SPA_MEDIA_SUBTYPE_opus: + impl->stream_info.media_type = SPA_MEDIA_TYPE_audio; + impl->stream_info.media_subtype = SPA_MEDIA_SUBTYPE_raw; + parse_audio_info(props, &impl->stream_info.info.raw); + impl->stream_info.info.raw.format = SPA_AUDIO_FORMAT_F32; + impl->info.info.opus.rate = impl->stream_info.info.raw.rate; + impl->info.info.opus.channels = impl->stream_info.info.raw.channels; + + impl->format_info = find_audio_format_info(&impl->info); + if (impl->format_info == NULL) { + pw_log_error("unsupported audio format:%d channels:%d", + impl->stream_info.info.raw.format, + impl->stream_info.info.raw.channels); + res = -EINVAL; + goto out; + } + impl->stride = impl->format_info->size * impl->stream_info.info.raw.channels; + impl->rate = impl->stream_info.info.raw.rate; + break; default: spa_assert_not_reached(); break; @@ -419,6 +450,12 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_control)); rtp_midi_init(impl, direction); break; + case SPA_MEDIA_SUBTYPE_opus: + params[n_params++] = spa_format_audio_build(&b, + SPA_PARAM_EnumFormat, &impl->stream_info); + flags |= PW_STREAM_FLAG_AUTOCONNECT; + rtp_opus_init(impl, direction); + break; default: res = -EINVAL; goto out;