fftools/ffmpeg: add thread-aware transcode scheduling infrastructure

See the comment block at the top of fftools/ffmpeg_sched.h for more
details on what this scheduler is for.

This commit adds the scheduling code itself, along with minimal
integration with the rest of the program:
* allocating and freeing the scheduler
* passing it throughout the call stack in order to register the
  individual components (demuxers/decoders/filtergraphs/encoders/muxers)
  with the scheduler

The scheduler is not actually used as of this commit, so it should not
result in any change in behavior. That will change in future commits.
This commit is contained in:
Anton Khirnov 2023-05-18 16:56:15 +02:00
parent ee2a8cbfd1
commit 9b8cc36ce0
13 changed files with 2936 additions and 56 deletions

View File

@ -18,6 +18,7 @@ OBJS-ffmpeg += \
fftools/ffmpeg_mux.o \
fftools/ffmpeg_mux_init.o \
fftools/ffmpeg_opt.o \
fftools/ffmpeg_sched.o \
fftools/objpool.o \
fftools/sync_queue.o \
fftools/thread_queue.o \

View File

@ -99,6 +99,7 @@
#include "cmdutils.h"
#include "ffmpeg.h"
#include "ffmpeg_sched.h"
#include "ffmpeg_utils.h"
#include "sync_queue.h"
@ -1167,7 +1168,7 @@ static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
/*
* The following code is the main loop of the file converter
*/
static int transcode(int *err_rate_exceeded)
static int transcode(Scheduler *sch, int *err_rate_exceeded)
{
int ret = 0, i;
InputStream *ist;
@ -1305,6 +1306,8 @@ static int64_t getmaxrss(void)
int main(int argc, char **argv)
{
Scheduler *sch = NULL;
int ret, err_rate_exceeded;
BenchmarkTimeStamps ti;
@ -1322,8 +1325,14 @@ int main(int argc, char **argv)
show_banner(argc, argv, options);
sch = sch_alloc();
if (!sch) {
ret = AVERROR(ENOMEM);
goto finish;
}
/* parse options and open all input/output files */
ret = ffmpeg_parse_options(argc, argv);
ret = ffmpeg_parse_options(argc, argv, sch);
if (ret < 0)
goto finish;
@ -1341,7 +1350,7 @@ int main(int argc, char **argv)
}
current_time = ti = get_benchmark_time_stamps();
ret = transcode(&err_rate_exceeded);
ret = transcode(sch, &err_rate_exceeded);
if (ret >= 0 && do_benchmark) {
int64_t utime, stime, rtime;
current_time = get_benchmark_time_stamps();
@ -1361,5 +1370,8 @@ finish:
ret = 0;
ffmpeg_cleanup(ret);
sch_free(&sch);
return ret;
}

View File

@ -27,6 +27,7 @@
#include <signal.h>
#include "cmdutils.h"
#include "ffmpeg_sched.h"
#include "sync_queue.h"
#include "libavformat/avformat.h"
@ -721,7 +722,8 @@ int parse_and_set_vsync(const char *arg, int *vsync_var, int file_idx, int st_id
int check_filter_outputs(void);
int filtergraph_is_simple(const FilterGraph *fg);
int init_simple_filtergraph(InputStream *ist, OutputStream *ost,
char *graph_desc);
char *graph_desc,
Scheduler *sch, unsigned sch_idx_enc);
int init_complex_filtergraph(FilterGraph *fg);
int copy_av_subtitle(AVSubtitle *dst, const AVSubtitle *src);
@ -746,7 +748,8 @@ void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational t
*/
int ifilter_parameters_from_dec(InputFilter *ifilter, const AVCodecContext *dec);
int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost);
int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost,
unsigned sched_idx_enc);
/**
* Create a new filtergraph in the global filtergraph list.
@ -754,7 +757,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost);
* @param graph_desc Graph description; an av_malloc()ed string, filtergraph
* takes ownership of it.
*/
int fg_create(FilterGraph **pfg, char *graph_desc);
int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch);
void fg_free(FilterGraph **pfg);
@ -778,7 +781,7 @@ void fg_send_command(FilterGraph *fg, double time, const char *target,
*/
int reap_filters(FilterGraph *fg, int flush);
int ffmpeg_parse_options(int argc, char **argv);
int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch);
void enc_stats_write(OutputStream *ost, EncStats *es,
const AVFrame *frame, const AVPacket *pkt,
@ -801,7 +804,7 @@ AVBufferRef *hw_device_for_filter(void);
int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input);
int dec_open(InputStream *ist);
int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx);
void dec_free(Decoder **pdec);
/**
@ -815,7 +818,8 @@ void dec_free(Decoder **pdec);
*/
int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof);
int enc_alloc(Encoder **penc, const AVCodec *codec);
int enc_alloc(Encoder **penc, const AVCodec *codec,
Scheduler *sch, unsigned sch_idx);
void enc_free(Encoder **penc);
int enc_open(OutputStream *ost, const AVFrame *frame);
@ -831,7 +835,7 @@ int enc_flush(void);
*/
int of_stream_init(OutputFile *of, OutputStream *ost);
int of_write_trailer(OutputFile *of);
int of_open(const OptionsContext *o, const char *filename);
int of_open(const OptionsContext *o, const char *filename, Scheduler *sch);
void of_free(OutputFile **pof);
void of_enc_stats_close(void);
@ -845,7 +849,7 @@ int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts);
int64_t of_filesize(OutputFile *of);
int ifile_open(const OptionsContext *o, const char *filename);
int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch);
void ifile_close(InputFile **f);
/**
@ -932,4 +936,8 @@ extern const char * const opt_name_frame_rates[];
extern const char * const opt_name_top_field_first[];
#endif
void *muxer_thread(void *arg);
void *decoder_thread(void *arg);
void *encoder_thread(void *arg);
#endif /* FFTOOLS_FFMPEG_H */

View File

@ -52,6 +52,9 @@ struct Decoder {
AVFrame *sub_prev[2];
AVFrame *sub_heartbeat;
Scheduler *sch;
unsigned sch_idx;
pthread_t thread;
/**
* Queue for sending coded packets from the main thread to
@ -673,7 +676,7 @@ fail:
return AVERROR(ENOMEM);
}
static void *decoder_thread(void *arg)
void *decoder_thread(void *arg)
{
InputStream *ist = arg;
InputFile *ifile = input_files[ist->file_index];
@ -1045,7 +1048,7 @@ static int hw_device_setup_for_decode(InputStream *ist)
return 0;
}
int dec_open(InputStream *ist)
int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx)
{
Decoder *d;
const AVCodec *codec = ist->dec;
@ -1063,6 +1066,9 @@ int dec_open(InputStream *ist)
return ret;
d = ist->decoder;
d->sch = sch;
d->sch_idx = sch_idx;
if (codec->type == AVMEDIA_TYPE_SUBTITLE && ist->fix_sub_duration) {
for (int i = 0; i < FF_ARRAY_ELEMS(d->sub_prev); i++) {
d->sub_prev[i] = av_frame_alloc();

View File

@ -20,6 +20,7 @@
#include <stdint.h>
#include "ffmpeg.h"
#include "ffmpeg_sched.h"
#include "ffmpeg_utils.h"
#include "objpool.h"
#include "thread_queue.h"
@ -60,6 +61,9 @@ typedef struct DemuxStream {
// name used for logging
char log_name[32];
int sch_idx_stream;
int sch_idx_dec;
double ts_scale;
int streamcopy_needed;
@ -108,6 +112,7 @@ typedef struct Demuxer {
double readrate_initial_burst;
Scheduler *sch;
ThreadQueue *thread_queue;
int thread_queue_size;
pthread_t thread;
@ -780,7 +785,9 @@ void ifile_close(InputFile **pf)
static int ist_use(InputStream *ist, int decoding_needed)
{
Demuxer *d = demuxer_from_ifile(input_files[ist->file_index]);
DemuxStream *ds = ds_from_ist(ist);
int ret;
if (ist->user_set_discard == AVDISCARD_ALL) {
av_log(ist, AV_LOG_ERROR, "Cannot %s a disabled input stream\n",
@ -788,13 +795,32 @@ static int ist_use(InputStream *ist, int decoding_needed)
return AVERROR(EINVAL);
}
if (ds->sch_idx_stream < 0) {
ret = sch_add_demux_stream(d->sch, d->f.index);
if (ret < 0)
return ret;
ds->sch_idx_stream = ret;
}
ist->discard = 0;
ist->st->discard = ist->user_set_discard;
ist->decoding_needed |= decoding_needed;
ds->streamcopy_needed |= !decoding_needed;
if (decoding_needed && !avcodec_is_open(ist->dec_ctx)) {
int ret = dec_open(ist);
if (decoding_needed && ds->sch_idx_dec < 0) {
int is_audio = ist->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO;
ret = sch_add_dec(d->sch, decoder_thread, ist, d->loop && is_audio);
if (ret < 0)
return ret;
ds->sch_idx_dec = ret;
ret = sch_connect(d->sch, SCH_DSTREAM(d->f.index, ds->sch_idx_stream),
SCH_DEC(ds->sch_idx_dec));
if (ret < 0)
return ret;
ret = dec_open(ist, d->sch, ds->sch_idx_dec);
if (ret < 0)
return ret;
}
@ -804,6 +830,7 @@ static int ist_use(InputStream *ist, int decoding_needed)
int ist_output_add(InputStream *ist, OutputStream *ost)
{
DemuxStream *ds = ds_from_ist(ist);
int ret;
ret = ist_use(ist, ost->enc ? DECODING_FOR_OST : 0);
@ -816,11 +843,12 @@ int ist_output_add(InputStream *ist, OutputStream *ost)
ist->outputs[ist->nb_outputs - 1] = ost;
return 0;
return ost->enc ? ds->sch_idx_dec : ds->sch_idx_stream;
}
int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
{
DemuxStream *ds = ds_from_ist(ist);
int ret;
ret = ist_use(ist, is_simple ? DECODING_FOR_OST : DECODING_FOR_FILTER);
@ -838,7 +866,7 @@ int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
if (ret < 0)
return ret;
return 0;
return ds->sch_idx_dec;
}
static int choose_decoder(const OptionsContext *o, AVFormatContext *s, AVStream *st,
@ -970,6 +998,9 @@ static DemuxStream *demux_stream_alloc(Demuxer *d, AVStream *st)
if (!ds)
return NULL;
ds->sch_idx_stream = -1;
ds->sch_idx_dec = -1;
ds->ist.st = st;
ds->ist.file_index = f->index;
ds->ist.index = st->index;
@ -1295,7 +1326,7 @@ static Demuxer *demux_alloc(void)
return d;
}
int ifile_open(const OptionsContext *o, const char *filename)
int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch)
{
Demuxer *d;
InputFile *f;
@ -1322,6 +1353,11 @@ int ifile_open(const OptionsContext *o, const char *filename)
f = &d->f;
ret = sch_add_demux(sch, input_thread, d);
if (ret < 0)
return ret;
d->sch = sch;
if (stop_time != INT64_MAX && recording_time != INT64_MAX) {
stop_time = INT64_MAX;
av_log(d, AV_LOG_WARNING, "-t and -to cannot be used together; using -t.\n");

View File

@ -56,6 +56,9 @@ struct Encoder {
int opened;
int finished;
Scheduler *sch;
unsigned sch_idx;
pthread_t thread;
/**
* Queue for sending frames from the main thread to
@ -113,7 +116,8 @@ void enc_free(Encoder **penc)
av_freep(penc);
}
int enc_alloc(Encoder **penc, const AVCodec *codec)
int enc_alloc(Encoder **penc, const AVCodec *codec,
Scheduler *sch, unsigned sch_idx)
{
Encoder *enc;
@ -133,6 +137,9 @@ int enc_alloc(Encoder **penc, const AVCodec *codec)
if (!enc->pkt)
goto fail;
enc->sch = sch;
enc->sch_idx = sch_idx;
*penc = enc;
return 0;
@ -217,8 +224,6 @@ static int set_encoder_id(OutputFile *of, OutputStream *ost)
return 0;
}
static void *encoder_thread(void *arg);
static int enc_thread_start(OutputStream *ost)
{
Encoder *e = ost->enc;
@ -1001,7 +1006,7 @@ fail:
return AVERROR(ENOMEM);
}
static void *encoder_thread(void *arg)
void *encoder_thread(void *arg)
{
OutputStream *ost = arg;
OutputFile *of = output_files[ost->file_index];

View File

@ -65,6 +65,9 @@ typedef struct FilterGraphPriv {
// frame for sending output to the encoder
AVFrame *frame_enc;
Scheduler *sch;
unsigned sch_idx;
pthread_t thread;
/**
* Queue for sending frames from the main thread to the filtergraph. Has
@ -735,14 +738,19 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
{
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
int ret;
int ret, dec_idx;
av_assert0(!ifp->ist);
ifp->ist = ist;
ifp->type_src = ist->st->codecpar->codec_type;
ret = ist_filter_add(ist, ifilter, filtergraph_is_simple(ifilter->graph));
dec_idx = ist_filter_add(ist, ifilter, filtergraph_is_simple(ifilter->graph));
if (dec_idx < 0)
return dec_idx;
ret = sch_connect(fgp->sch, SCH_DEC(dec_idx),
SCH_FILTER_IN(fgp->sch_idx, ifp->index));
if (ret < 0)
return ret;
@ -798,13 +806,15 @@ static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
return 0;
}
int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost)
int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost,
unsigned sched_idx_enc)
{
const OutputFile *of = output_files[ost->file_index];
OutputFilterPriv *ofp = ofp_from_ofilter(ofilter);
FilterGraph *fg = ofilter->graph;
FilterGraphPriv *fgp = fgp_from_fg(fg);
const AVCodec *c = ost->enc_ctx->codec;
int ret;
av_assert0(!ofilter->ost);
@ -887,6 +897,11 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost)
break;
}
ret = sch_connect(fgp->sch, SCH_FILTER_OUT(fgp->sch_idx, ofp->index),
SCH_ENC(sched_idx_enc));
if (ret < 0)
return ret;
fgp->nb_outputs_bound++;
av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
@ -1016,7 +1031,7 @@ static const AVClass fg_class = {
.category = AV_CLASS_CATEGORY_FILTER,
};
int fg_create(FilterGraph **pfg, char *graph_desc)
int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch)
{
FilterGraphPriv *fgp;
FilterGraph *fg;
@ -1037,6 +1052,7 @@ int fg_create(FilterGraph **pfg, char *graph_desc)
fg->index = nb_filtergraphs - 1;
fgp->graph_desc = graph_desc;
fgp->disable_conversions = !auto_conversion_filters;
fgp->sch = sch;
snprintf(fgp->log_name, sizeof(fgp->log_name), "fc#%d", fg->index);
@ -1104,6 +1120,12 @@ int fg_create(FilterGraph **pfg, char *graph_desc)
goto fail;
}
ret = sch_add_filtergraph(sch, fg->nb_inputs, fg->nb_outputs,
filter_thread, fgp);
if (ret < 0)
goto fail;
fgp->sch_idx = ret;
fail:
avfilter_inout_free(&inputs);
avfilter_inout_free(&outputs);
@ -1116,13 +1138,14 @@ fail:
}
int init_simple_filtergraph(InputStream *ist, OutputStream *ost,
char *graph_desc)
char *graph_desc,
Scheduler *sch, unsigned sched_idx_enc)
{
FilterGraph *fg;
FilterGraphPriv *fgp;
int ret;
ret = fg_create(&fg, graph_desc);
ret = fg_create(&fg, graph_desc, sch);
if (ret < 0)
return ret;
fgp = fgp_from_fg(fg);
@ -1148,7 +1171,7 @@ int init_simple_filtergraph(InputStream *ist, OutputStream *ost,
if (ret < 0)
return ret;
ret = ofilter_bind_ost(fg->outputs[0], ost);
ret = ofilter_bind_ost(fg->outputs[0], ost, sched_idx_enc);
if (ret < 0)
return ret;

View File

@ -297,7 +297,7 @@ fail:
return AVERROR(ENOMEM);
}
static void *muxer_thread(void *arg)
void *muxer_thread(void *arg)
{
Muxer *mux = arg;
OutputFile *of = &mux->of;
@ -580,7 +580,9 @@ static int thread_start(Muxer *mux)
return 0;
}
static int print_sdp(void)
int print_sdp(const char *filename);
int print_sdp(const char *filename)
{
char sdp[16384];
int i;
@ -613,19 +615,18 @@ static int print_sdp(void)
if (ret < 0)
goto fail;
if (!sdp_filename) {
if (!filename) {
printf("SDP:\n%s\n", sdp);
fflush(stdout);
} else {
ret = avio_open2(&sdp_pb, sdp_filename, AVIO_FLAG_WRITE, &int_cb, NULL);
ret = avio_open2(&sdp_pb, filename, AVIO_FLAG_WRITE, &int_cb, NULL);
if (ret < 0) {
av_log(NULL, AV_LOG_ERROR, "Failed to open sdp file '%s'\n", sdp_filename);
av_log(NULL, AV_LOG_ERROR, "Failed to open sdp file '%s'\n", filename);
goto fail;
}
avio_print(sdp_pb, sdp);
avio_closep(&sdp_pb);
av_freep(&sdp_filename);
}
// SDP successfully written, allow muxer threads to start
@ -661,7 +662,7 @@ int mux_check_init(Muxer *mux)
nb_output_dumped++;
if (sdp_filename || want_sdp) {
ret = print_sdp();
ret = print_sdp(sdp_filename);
if (ret < 0) {
av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
return ret;
@ -984,6 +985,8 @@ void of_free(OutputFile **pof)
ost_free(&of->streams[i]);
av_freep(&of->streams);
av_freep(&mux->sch_stream_idx);
av_dict_free(&mux->opts);
av_packet_free(&mux->sq_pkt);

View File

@ -24,6 +24,7 @@
#include <stdatomic.h>
#include <stdint.h>
#include "ffmpeg_sched.h"
#include "thread_queue.h"
#include "libavformat/avformat.h"
@ -50,6 +51,10 @@ typedef struct MuxStream {
EncStats stats;
int sch_idx;
int sch_idx_enc;
int sch_idx_src;
int64_t max_frames;
/*
@ -94,6 +99,13 @@ typedef struct Muxer {
AVFormatContext *fc;
Scheduler *sch;
unsigned sch_idx;
// OutputStream indices indexed by scheduler stream indices
int *sch_stream_idx;
int nb_sch_stream_idx;
pthread_t thread;
ThreadQueue *tq;

View File

@ -23,6 +23,7 @@
#include "cmdutils.h"
#include "ffmpeg.h"
#include "ffmpeg_mux.h"
#include "ffmpeg_sched.h"
#include "fopen_utf8.h"
#include "libavformat/avformat.h"
@ -436,6 +437,9 @@ static MuxStream *mux_stream_alloc(Muxer *mux, enum AVMediaType type)
ms->ost.class = &output_stream_class;
ms->sch_idx = -1;
ms->sch_idx_enc = -1;
snprintf(ms->log_name, sizeof(ms->log_name), "%cost#%d:%d",
type_str ? *type_str : '?', mux->of.index, ms->ost.index);
@ -1127,6 +1131,22 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
if (!ms)
return AVERROR(ENOMEM);
// only streams with sources (i.e. not attachments)
// are handled by the scheduler
if (ist || ofilter) {
ret = GROW_ARRAY(mux->sch_stream_idx, mux->nb_sch_stream_idx);
if (ret < 0)
return ret;
ret = sch_add_mux_stream(mux->sch, mux->sch_idx);
if (ret < 0)
return ret;
av_assert0(ret == mux->nb_sch_stream_idx - 1);
mux->sch_stream_idx[ret] = ms->ost.index;
ms->sch_idx = ret;
}
ost = &ms->ost;
if (o->streamid) {
@ -1170,7 +1190,12 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
if (!ost->enc_ctx)
return AVERROR(ENOMEM);
ret = enc_alloc(&ost->enc, enc);
ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL);
if (ret < 0)
return ret;
ms->sch_idx_enc = ret;
ret = enc_alloc(&ost->enc, enc, mux->sch, ms->sch_idx_enc);
if (ret < 0)
return ret;
@ -1380,11 +1405,19 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
ost->enc_ctx->global_quality = FF_QP2LAMBDA * qscale;
}
ms->max_muxing_queue_size = 128;
MATCH_PER_STREAM_OPT(max_muxing_queue_size, i, ms->max_muxing_queue_size, oc, st);
if (ms->sch_idx >= 0) {
int max_muxing_queue_size = 128;
int muxing_queue_data_threshold = 50 * 1024 * 1024;
ms->muxing_queue_data_threshold = 50*1024*1024;
MATCH_PER_STREAM_OPT(muxing_queue_data_threshold, i, ms->muxing_queue_data_threshold, oc, st);
MATCH_PER_STREAM_OPT(max_muxing_queue_size, i, max_muxing_queue_size, oc, st);
MATCH_PER_STREAM_OPT(muxing_queue_data_threshold, i, muxing_queue_data_threshold, oc, st);
sch_mux_stream_buffering(mux->sch, mux->sch_idx, ms->sch_idx,
max_muxing_queue_size, muxing_queue_data_threshold);
ms->max_muxing_queue_size = max_muxing_queue_size;
ms->muxing_queue_data_threshold = muxing_queue_data_threshold;
}
MATCH_PER_STREAM_OPT(bits_per_raw_sample, i, ost->bits_per_raw_sample,
oc, st);
@ -1425,23 +1458,47 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
(type == AVMEDIA_TYPE_VIDEO || type == AVMEDIA_TYPE_AUDIO)) {
if (ofilter) {
ost->filter = ofilter;
ret = ofilter_bind_ost(ofilter, ost);
ret = ofilter_bind_ost(ofilter, ost, ms->sch_idx_enc);
if (ret < 0)
return ret;
} else {
ret = init_simple_filtergraph(ost->ist, ost, filters);
ret = init_simple_filtergraph(ost->ist, ost, filters,
mux->sch, ms->sch_idx_enc);
if (ret < 0) {
av_log(ost, AV_LOG_ERROR,
"Error initializing a simple filtergraph\n");
return ret;
}
}
ret = sch_connect(mux->sch, SCH_ENC(ms->sch_idx_enc),
SCH_MSTREAM(mux->sch_idx, ms->sch_idx));
if (ret < 0)
return ret;
} else if (ost->ist) {
ret = ist_output_add(ost->ist, ost);
if (ret < 0) {
int sched_idx = ist_output_add(ost->ist, ost);
if (sched_idx < 0) {
av_log(ost, AV_LOG_ERROR,
"Error binding an input stream\n");
return ret;
return sched_idx;
}
ms->sch_idx_src = sched_idx;
if (ost->enc) {
ret = sch_connect(mux->sch, SCH_DEC(sched_idx),
SCH_ENC(ms->sch_idx_enc));
if (ret < 0)
return ret;
ret = sch_connect(mux->sch, SCH_ENC(ms->sch_idx_enc),
SCH_MSTREAM(mux->sch_idx, ms->sch_idx));
if (ret < 0)
return ret;
} else {
ret = sch_connect(mux->sch, SCH_DSTREAM(ost->ist->file_index, sched_idx),
SCH_MSTREAM(ost->file_index, ms->sch_idx));
if (ret < 0)
return ret;
}
}
@ -1837,6 +1894,26 @@ static int create_streams(Muxer *mux, const OptionsContext *o)
if (ret < 0)
return ret;
// setup fix_sub_duration_heartbeat mappings
for (unsigned i = 0; i < oc->nb_streams; i++) {
MuxStream *src = ms_from_ost(mux->of.streams[i]);
if (!src->ost.fix_sub_duration_heartbeat)
continue;
for (unsigned j = 0; j < oc->nb_streams; j++) {
MuxStream *dst = ms_from_ost(mux->of.streams[j]);
if (src == dst || dst->ost.type != AVMEDIA_TYPE_SUBTITLE ||
!dst->ost.enc || !dst->ost.ist || !dst->ost.ist->fix_sub_duration)
continue;
ret = sch_mux_sub_heartbeat_add(mux->sch, mux->sch_idx, src->sch_idx,
dst->sch_idx_src);
}
}
if (!oc->nb_streams && !(oc->oformat->flags & AVFMT_NOSTREAMS)) {
av_dump_format(oc, nb_output_files - 1, oc->url, 1);
av_log(mux, AV_LOG_ERROR, "Output file does not contain any stream\n");
@ -2621,7 +2698,7 @@ static Muxer *mux_alloc(void)
return mux;
}
int of_open(const OptionsContext *o, const char *filename)
int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
{
Muxer *mux;
AVFormatContext *oc;
@ -2691,6 +2768,13 @@ int of_open(const OptionsContext *o, const char *filename)
AVFMT_FLAG_BITEXACT);
}
err = sch_add_mux(sch, muxer_thread, NULL, mux,
!strcmp(oc->oformat->name, "rtp"));
if (err < 0)
return err;
mux->sch = sch;
mux->sch_idx = err;
/* create all output streams for this file */
err = create_streams(mux, o);
if (err < 0)

View File

@ -28,6 +28,7 @@
#endif
#include "ffmpeg.h"
#include "ffmpeg_sched.h"
#include "cmdutils.h"
#include "opt_common.h"
#include "sync_queue.h"
@ -1157,20 +1158,22 @@ static int opt_audio_qscale(void *optctx, const char *opt, const char *arg)
static int opt_filter_complex(void *optctx, const char *opt, const char *arg)
{
Scheduler *sch = optctx;
char *graph_desc = av_strdup(arg);
if (!graph_desc)
return AVERROR(ENOMEM);
return fg_create(NULL, graph_desc);
return fg_create(NULL, graph_desc, sch);
}
static int opt_filter_complex_script(void *optctx, const char *opt, const char *arg)
{
Scheduler *sch = optctx;
char *graph_desc = file_read(arg);
if (!graph_desc)
return AVERROR(EINVAL);
return fg_create(NULL, graph_desc);
return fg_create(NULL, graph_desc, sch);
}
void show_help_default(const char *opt, const char *arg)
@ -1262,8 +1265,9 @@ static const OptionGroupDef groups[] = {
[GROUP_INFILE] = { "input url", "i", OPT_INPUT },
};
static int open_files(OptionGroupList *l, const char *inout,
int (*open_file)(const OptionsContext*, const char*))
static int open_files(OptionGroupList *l, const char *inout, Scheduler *sch,
int (*open_file)(const OptionsContext*, const char*,
Scheduler*))
{
int i, ret;
@ -1283,7 +1287,7 @@ static int open_files(OptionGroupList *l, const char *inout,
}
av_log(NULL, AV_LOG_DEBUG, "Opening an %s file: %s.\n", inout, g->arg);
ret = open_file(&o, g->arg);
ret = open_file(&o, g->arg, sch);
uninit_options(&o);
if (ret < 0) {
av_log(NULL, AV_LOG_ERROR, "Error opening %s file %s.\n",
@ -1296,7 +1300,7 @@ static int open_files(OptionGroupList *l, const char *inout,
return 0;
}
int ffmpeg_parse_options(int argc, char **argv)
int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch)
{
OptionParseContext octx;
const char *errmsg = NULL;
@ -1313,7 +1317,7 @@ int ffmpeg_parse_options(int argc, char **argv)
}
/* apply global options */
ret = parse_optgroup(NULL, &octx.global_opts);
ret = parse_optgroup(sch, &octx.global_opts);
if (ret < 0) {
errmsg = "parsing global options";
goto fail;
@ -1323,7 +1327,7 @@ int ffmpeg_parse_options(int argc, char **argv)
term_init();
/* open input files */
ret = open_files(&octx.groups[GROUP_INFILE], "input", ifile_open);
ret = open_files(&octx.groups[GROUP_INFILE], "input", sch, ifile_open);
if (ret < 0) {
errmsg = "opening input files";
goto fail;
@ -1337,7 +1341,7 @@ int ffmpeg_parse_options(int argc, char **argv)
}
/* open output files */
ret = open_files(&octx.groups[GROUP_OUTFILE], "output", of_open);
ret = open_files(&octx.groups[GROUP_OUTFILE], "output", sch, of_open);
if (ret < 0) {
errmsg = "opening output files";
goto fail;

2218
fftools/ffmpeg_sched.c Normal file

File diff suppressed because it is too large Load Diff

468
fftools/ffmpeg_sched.h Normal file
View File

@ -0,0 +1,468 @@
/*
* Inter-thread scheduling/synchronization.
* Copyright (c) 2023 Anton Khirnov
*
* This file is part of FFmpeg.
*
* FFmpeg is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* FFmpeg is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with FFmpeg; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef FFTOOLS_FFMPEG_SCHED_H
#define FFTOOLS_FFMPEG_SCHED_H
#include <stddef.h>
#include <stdint.h>
#include "ffmpeg_utils.h"
/*
* This file contains the API for the transcode scheduler.
*
* Overall architecture of the transcoding process involves instances of the
* following components:
* - demuxers, each containing any number of demuxed streams; demuxed packets
* belonging to some stream are sent to any number of decoders (transcoding)
* and/or muxers (streamcopy);
* - decoders, which receive encoded packets from some demuxed stream, decode
* them, and send decoded frames to any number of filtergraph inputs
* (audio/video) or encoders (subtitles);
* - filtergraphs, each containing zero or more inputs (0 in case the
* filtergraph contains a lavfi source filter), and one or more outputs; the
* inputs and outputs need not have matching media types;
* each filtergraph input receives decoded frames from some decoder;
* filtered frames from each output are sent to some encoder;
* - encoders, which receive decoded frames from some decoder (subtitles) or
* some filtergraph output (audio/video), encode them, and send encoded
* packets to some muxed stream;
* - muxers, each containing any number of muxed streams; each muxed stream
* receives encoded packets from some demuxed stream (streamcopy) or some
* encoder (transcoding); those packets are interleaved and written out by the
* muxer.
*
* There must be at least one muxer instance, otherwise the transcode produces
* no output and is meaningless. Otherwise, in a generic transcoding scenario
* there may be arbitrary number of instances of any of the above components,
* interconnected in various ways.
*
* The code tries to keep all the output streams across all the muxers in sync
* (i.e. at the same DTS), which is accomplished by varying the rates at which
* packets are read from different demuxers and lavfi sources. Note that the
* degree of control we have over synchronization is fundamentally limited - if
* some demuxed streams in the same input are interleaved at different rates
* than that at which they are to be muxed (e.g. because an input file is badly
* interleaved, or the user changed their speed by mismatching amounts), then
* there will be increasing amounts of buffering followed by eventual
* transcoding failure.
*
* N.B. 1: there are meaningful transcode scenarios with no demuxers, e.g.
* - encoding and muxing output from filtergraph(s) that have no inputs;
* - creating a file that contains nothing but attachments and/or metadata.
*
* N.B. 2: a filtergraph output could, in principle, feed multiple encoders, but
* this is unnecessary because the (a)split filter provides the same
* functionality.
*
* The scheduler, in the above model, is the master object that oversees and
* facilitates the transcoding process. The basic idea is that all instances
* of the abovementioned components communicate only with the scheduler and not
* with each other. The scheduler is then the single place containing the
* knowledge about the whole transcoding pipeline.
*/
struct AVFrame;
struct AVPacket;
typedef struct Scheduler Scheduler;
enum SchedulerNodeType {
SCH_NODE_TYPE_NONE = 0,
SCH_NODE_TYPE_DEMUX,
SCH_NODE_TYPE_MUX,
SCH_NODE_TYPE_DEC,
SCH_NODE_TYPE_ENC,
SCH_NODE_TYPE_FILTER_IN,
SCH_NODE_TYPE_FILTER_OUT,
};
typedef struct SchedulerNode {
enum SchedulerNodeType type;
unsigned idx;
unsigned idx_stream;
} SchedulerNode;
typedef void* (*SchThreadFunc)(void *arg);
#define SCH_DSTREAM(file, stream) \
(SchedulerNode){ .type = SCH_NODE_TYPE_DEMUX, \
.idx = file, .idx_stream = stream }
#define SCH_MSTREAM(file, stream) \
(SchedulerNode){ .type = SCH_NODE_TYPE_MUX, \
.idx = file, .idx_stream = stream }
#define SCH_DEC(decoder) \
(SchedulerNode){ .type = SCH_NODE_TYPE_DEC, \
.idx = decoder }
#define SCH_ENC(encoder) \
(SchedulerNode){ .type = SCH_NODE_TYPE_ENC, \
.idx = encoder }
#define SCH_FILTER_IN(filter, input) \
(SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_IN, \
.idx = filter, .idx_stream = input }
#define SCH_FILTER_OUT(filter, output) \
(SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_OUT, \
.idx = filter, .idx_stream = output }
Scheduler *sch_alloc(void);
void sch_free(Scheduler **sch);
int sch_start(Scheduler *sch);
int sch_stop(Scheduler *sch);
/**
* Wait until transcoding terminates or the specified timeout elapses.
*
* @param timeout_us Amount of time in microseconds after which this function
* will timeout.
* @param transcode_ts Current transcode timestamp in AV_TIME_BASE_Q, for
* informational purposes only.
*
* @retval 0 waiting timed out, transcoding is not finished
* @retval 1 transcoding is finished
*/
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts);
/**
* Add a demuxer to the scheduler.
*
* @param func Function executed as the demuxer task.
* @param ctx Demuxer state; will be passed to func and used for logging.
*
* @retval ">=0" Index of the newly-created demuxer.
* @retval "<0" Error code.
*/
int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx);
/**
* Add a demuxed stream for a previously added demuxer.
*
* @param demux_idx index previously returned by sch_add_demux()
*
* @retval ">=0" Index of the newly-created demuxed stream.
* @retval "<0" Error code.
*/
int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx);
/**
* Add a decoder to the scheduler.
*
* @param func Function executed as the decoder task.
* @param ctx Decoder state; will be passed to func and used for logging.
* @param send_end_ts The decoder will return an end timestamp after flush packets
* are delivered to it. See documentation for
* sch_dec_receive() for more details.
*
* @retval ">=0" Index of the newly-created decoder.
* @retval "<0" Error code.
*/
int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
int send_end_ts);
/**
* Add a filtergraph to the scheduler.
*
* @param nb_inputs Number of filtergraph inputs.
* @param nb_outputs number of filtergraph outputs
* @param func Function executed as the filtering task.
* @param ctx Filter state; will be passed to func and used for logging.
*
* @retval ">=0" Index of the newly-created filtergraph.
* @retval "<0" Error code.
*/
int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
SchThreadFunc func, void *ctx);
/**
* Add a muxer to the scheduler.
*
* Note that muxer thread startup is more complicated than for other components,
* because
* - muxer streams fed by audio/video encoders become initialized dynamically at
* runtime, after those encoders receive their first frame and initialize
* themselves, followed by calling sch_mux_stream_ready()
* - the header can be written after all the streams for a muxer are initialized
* - we may need to write an SDP, which must happen
* - AFTER all the headers are written
* - BEFORE any packets are written by any muxer
* - with all the muxers quiescent
* To avoid complicated muxer-thread synchronization dances, we postpone
* starting the muxer threads until after the SDP is written. The sequence of
* events is then as follows:
* - After sch_mux_stream_ready() is called for all the streams in a given muxer,
* the header for that muxer is written (care is taken that headers for
* different muxers are not written concurrently, since they write file
* information to stderr). If SDP is not wanted, the muxer thread then starts
* and muxing begins.
* - When SDP _is_ wanted, no muxer threads start until the header for the last
* muxer is written. After that, the SDP is written, after which all the muxer
* threads are started at once.
*
* In order for the above to work, the scheduler needs to be able to invoke
* just writing the header, which is the reason the init parameter exists.
*
* @param func Function executed as the muxing task.
* @param init Callback that is called to initialize the muxer and write the
* header. Called after sch_mux_stream_ready() is called for all the
* streams in the muxer.
* @param ctx Muxer state; will be passed to func/init and used for logging.
* @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename().
*
* @retval ">=0" Index of the newly-created muxer.
* @retval "<0" Error code.
*/
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
void *ctx, int sdp_auto);
/**
* Add a muxed stream for a previously added muxer.
*
* @param mux_idx index previously returned by sch_add_mux()
*
* @retval ">=0" Index of the newly-created muxed stream.
* @retval "<0" Error code.
*/
int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx);
/**
* Configure limits on packet buffering performed before the muxer task is
* started.
*
* @param mux_idx index previously returned by sch_add_mux()
* @param stream_idx_idx index previously returned by sch_add_mux_stream()
* @param data_threshold Total size of the buffered packets' data after which
* max_packets applies.
* @param max_packets maximum Maximum number of buffered packets after
* data_threshold is reached.
*/
void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
size_t data_threshold, int max_packets);
/**
* Signal to the scheduler that the specified muxed stream is initialized and
* ready. Muxing is started once all the streams are ready.
*/
int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx);
/**
* Set the file path for the SDP.
*
* The SDP is written when either of the following is true:
* - this function is called at least once
* - sdp_auto=1 is passed to EVERY call of sch_add_mux()
*/
int sch_sdp_filename(Scheduler *sch, const char *sdp_filename);
/**
* Add an encoder to the scheduler.
*
* @param func Function executed as the encoding task.
* @param ctx Encoder state; will be passed to func and used for logging.
* @param open_cb This callback, if specified, will be called when the first
* frame is obtained for this encoder. For audio encoders with a
* fixed frame size (which use a sync queue in the scheduler to
* rechunk frames), it must return that frame size on success.
* Otherwise (non-audio, variable frame size) it should return 0.
*
* @retval ">=0" Index of the newly-created encoder.
* @retval "<0" Error code.
*/
int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
int (*open_cb)(void *func_arg, const struct AVFrame *frame));
/**
* Add an pre-encoding sync queue to the scheduler.
*
* @param buf_size_us Sync queue buffering size, passed to sq_alloc().
* @param logctx Logging context for the sync queue. passed to sq_alloc().
*
* @retval ">=0" Index of the newly-created sync queue.
* @retval "<0" Error code.
*/
int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx);
int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
int limiting, uint64_t max_frames);
int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst);
enum DemuxSendFlags {
/**
* Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations
* send normally to other types.
*/
DEMUX_SEND_STREAMCOPY_EOF = (1 << 0),
};
/**
* Called by demuxer tasks to communicate with their downstreams. The following
* may be sent:
* - a demuxed packet for the stream identified by pkt->stream_index;
* - demuxer discontinuity/reset (e.g. after a seek) - this is signalled by an
* empty packet with stream_index=-1.
*
* @param demux_idx demuxer index
* @param pkt A demuxed packet to send.
* When flushing (i.e. pkt->stream_index=-1 on entry to this
* function), on successful return pkt->pts/pkt->time_base will be
* set to the maximum end timestamp of any decoded audio stream, or
* AV_NOPTS_VALUE if no decoded audio streams are present.
*
* @retval "non-negative value" success
* @retval AVERROR_EOF all consumers for the stream are done
* @retval AVERROR_EXIT all consumers are done, should terminate demuxing
* @retval "anoter negative error code" other failure
*/
int sch_demux_send(Scheduler *sch, unsigned demux_idx, struct AVPacket *pkt,
unsigned flags);
/**
* Called by decoder tasks to receive a packet for decoding.
*
* @param dec_idx decoder index
* @param pkt Input packet will be written here on success.
*
* An empty packet signals that the decoder should be flushed, but
* more packets will follow (e.g. after seeking). When a decoder
* created with send_end_ts=1 receives a flush packet, it must write
* the end timestamp of the stream after flushing to
* pkt->pts/time_base on the next call to this function (if any).
*
* @retval "non-negative value" success
* @retval AVERROR_EOF no more packets will arrive, should terminate decoding
* @retval "another negative error code" other failure
*/
int sch_dec_receive(Scheduler *sch, unsigned dec_idx, struct AVPacket *pkt);
/**
* Called by decoder tasks to send a decoded frame downstream.
*
* @param dec_idx Decoder index previously returned by sch_add_dec().
* @param frame Decoded frame; on success it is consumed and cleared by this
* function
*
* @retval ">=0" success
* @retval AVERROR_EOF all consumers are done, should terminate decoding
* @retval "another negative error code" other failure
*/
int sch_dec_send(Scheduler *sch, unsigned dec_idx, struct AVFrame *frame);
/**
* Called by filtergraph tasks to obtain frames for filtering. Will wait for a
* frame to become available and return it in frame.
*
* Filtergraphs that contain lavfi sources and do not currently require new
* input frames should call this function as a means of rate control - then
* in_idx should be set equal to nb_inputs on entry to this function.
*
* @param fg_idx Filtergraph index previously returned by sch_add_filtergraph().
* @param[in,out] in_idx On input contains the index of the input on which a frame
* is most desired. May be set to nb_inputs to signal that
* the filtergraph does not need more input currently.
*
* On success, will be replaced with the input index of
* the actually returned frame or EOF timestamp.
*
* @retval ">=0" Frame data or EOF timestamp was delivered into frame, in_idx
* contains the index of the input it belongs to.
* @retval AVERROR(EAGAIN) No frame was returned, the filtergraph should
* resume filtering. May only be returned when
* in_idx=nb_inputs on entry to this function.
* @retval AVERROR_EOF No more frames will arrive, should terminate filtering.
*/
int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
unsigned *in_idx, struct AVFrame *frame);
/**
* Called by filtergraph tasks to send a filtered frame or EOF to consumers.
*
* @param fg_idx Filtergraph index previously returned by sch_add_filtergraph().
* @param out_idx Index of the output which produced the frame.
* @param frame The frame to send to consumers. When NULL, signals that no more
* frames will be produced for the specified output. When non-NULL,
* the frame is consumed and cleared by this function on success.
*
* @retval "non-negative value" success
* @retval AVERROR_EOF all consumers are done
* @retval "anoter negative error code" other failure
*/
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx,
struct AVFrame *frame);
int sch_filter_command(Scheduler *sch, unsigned fg_idx, struct AVFrame *frame);
/**
* Called by encoder tasks to obtain frames for encoding. Will wait for a frame
* to become available and return it in frame.
*
* @param enc_idx Encoder index previously returned by sch_add_enc().
* @param frame Newly-received frame will be stored here on success. Must be
* clean on entrance to this function.
*
* @retval 0 A frame was successfully delivered into frame.
* @retval AVERROR_EOF No more frames will be delivered, the encoder should
* flush everything and terminate.
*
*/
int sch_enc_receive(Scheduler *sch, unsigned enc_idx, struct AVFrame *frame);
/**
* Called by encoder tasks to send encoded packets downstream.
*
* @param enc_idx Encoder index previously returned by sch_add_enc().
* @param pkt An encoded packet; it will be consumed and cleared by this
* function on success.
*
* @retval 0 success
* @retval "<0" Error code.
*/
int sch_enc_send (Scheduler *sch, unsigned enc_idx, struct AVPacket *pkt);
/**
* Called by muxer tasks to obtain packets for muxing. Will wait for a packet
* for any muxed stream to become available and return it in pkt.
*
* @param mux_idx Muxer index previously returned by sch_add_mux().
* @param pkt Newly-received packet will be stored here on success. Must be
* clean on entrance to this function.
*
* @retval 0 A packet was successfully delivered into pkt. Its stream_index
* corresponds to a stream index previously returned from
* sch_add_mux_stream().
* @retval AVERROR_EOF When pkt->stream_index is non-negative, this signals that
* no more packets will be delivered for this stream index.
* Otherwise this indicates that no more packets will be
* delivered for any stream and the muxer should therefore
* flush everything and terminate.
*/
int sch_mux_receive(Scheduler *sch, unsigned mux_idx, struct AVPacket *pkt);
/**
* Called by muxer tasks to signal that a stream will no longer accept input.
*
* @param stream_idx Stream index previously returned from sch_add_mux_stream().
*/
void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx);
int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
unsigned dec_idx);
int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
const AVPacket *pkt);
#endif /* FFTOOLS_FFMPEG_SCHED_H */