# 前言
本篇文章介绍 Halide 多级流水线调度优化策略。
本文来自于《Halide 官方教程》,读者可以去阅读原文。所以看本文的价值在于?呃…… 是中文的?(但原文肯定更准确)更简洁?(也许是缺点)画图更清楚?(假的,因为官网图更好,还是动图,我不想画了)。所以我也不知道为啥一定要看这篇文章而不是原文,唯一好处是我挑出了重点?官方教程文章太多,我只挑其中几篇重点,这是第一篇,全当自己记录了。
仍然建议看原文。
Halide 这篇教程的多级流水线(multi-stage pipelines)指的是多个计算步骤,第一个计算步骤的结果是第二个计算步骤的输入,怎样的流水线调度才能在空间占用、计算复杂度两个方面达到好的效率。这种场景和神经网络推理非常相似,神经网络是一个拓扑结构图,后续节点的输入来自前序节点的输出。所以本节对神经网络的推理优化也有借鉴意义。
参考链接:《Halide 官方教程》
作为初学者,错误在所难免,还望不吝赐教。
# Halide
Halide 是一种专门设计的领域特定语言(DSL),主要用于编写高性能的图像处理和数组计算程序。它由 MIT 和 Adobe Research 的研究人员于 2010 年左右开发,并因其在优化复杂图像处理流水线方面的卓越能力而受到广泛关注。Halide 的核心创新在于将算法(What to compute)与调度(How and when to compute it)明确分离。
Halide 通常作为 C++ 的嵌入式 DSL 使用。你用 C++ 编写程序,在其中定义 Halide 的函数和调度。
Halide 编译器会将 Halide 代码(算法 + 调度)编译成高效的 C++ 或 GPU 代码(如 CUDA, OpenCL, Metal),然后链接到你的主程序中。
它拥有一个强大的 JIT(即时编译)和 AOT(提前编译)编译系统。
# 多级流水线调度
Halide 这篇教程的多级流水线(multi-stage pipelines)指的是多个计算步骤,第一个计算步骤的结果是第二个计算步骤的输入,怎样的流水线调度才能在空间占用、计算复杂度两个方面达到好的效率。这种场景和神经网络推理非常相似,神经网络是一个拓扑结构图,后续节点的输入来自前序节点的输出。所以本节对神经网络的推理优化也有借鉴意义。
# 两级流水线默认调度
官网给的例子是 producer 和 consumer ,函数 producer 的输出数据作为 consumer 的输入。简单如下所示:

用以下代码来构建这个默认的例子:
Var x("x"), y("y"); | |
producer(x, y) = sin(x * y); | |
consumer(x, y) = (producer(x, y) + | |
producer(x, y + 1) + | |
producer(x + 1, y) + | |
producer(x + 1, y + 1)) / 4; | |
consumer.trace_stores(); | |
producer.trace_stores(); | |
consumer.realize({4, 4}); |
经过编译之后,等效的 C 代码如下:
float result[4][4]; | |
for (int y = 0; y < 4; y++) { | |
for (int x = 0; x < 4; x++) { | |
result[y][x] = (sin(x * y) + | |
sin(x * (y + 1)) + | |
sin((x + 1) * y) + | |
sin((x + 1) * (y + 1))) / 4; | |
} | |
} | |
printf("\n"); |
可以看到代码中间 producer 的位置已经全部替换为其函数内容,即默认的多级调度情况下,Halide 会将 producer 全部内联到 consumer 中。
# Root 调度
所有 producer 计算完毕之后,再计算 consumer ,官方例子中叫它 root 调度。
Func producer("producer_root"), consumer("consumer_root"); | |
producer(x, y) = sin(x * y); | |
consumer(x, y) = (producer(x, y) + | |
producer(x, y + 1) + | |
producer(x + 1, y) + | |
producer(x + 1, y + 1)) / 4; | |
producer.compute_root(); // 增加该指令 | |
// Turn on tracing. | |
consumer.trace_stores(); | |
producer.trace_stores(); | |
consumer.realize({4, 4}); |
和默认调度的差异是,增加了 compute_root() 这条指令。
动图展示为:(动图中黄色方块代表存储 [store],即计算完毕之后存储下来;蓝色方块代表加载 [load],即读取暂存数据,用于下一步计算。)

等效的 C 代码如下:
float result[4][4]; | |
// Allocate some temporary storage for the producer. | |
float producer_storage[5][5]; | |
// Compute the producer. | |
for (int y = 0; y < 5; y++) { | |
for (int x = 0; x < 5; x++) { | |
producer_storage[y][x] = sin(x * y); | |
} | |
} | |
// Compute the consumer. Skip the prints this time. | |
for (int y = 0; y < 4; y++) { | |
for (int x = 0; x < 4; x++) { | |
result[y][x] = (producer_storage[y][x] + | |
producer_storage[y + 1][x] + | |
producer_storage[y][x + 1] + | |
producer_storage[y + 1][x + 1]) / 4; | |
} | |
} |
值得注意的是,由于 consumer 和 producer 的特点,中间 Data producer_storage 是 5*5 的矩阵,最后输出 result 是 4*4 的矩阵。
从上方结果来看, compute_root() 使得代码走向 全部内联的反面,所有的 producer 结果都提前计算完成,然后由 consumer 读取中间结果,进行计算。
比较一下 “全部内联的默认调度” 和 “Root 调度” 两种算法的表现:
// Full inlining (the default schedule): | |
// - Temporary memory allocated: 0 | |
// - Loads: 0 | |
// - Stores: 16 | |
// - Calls to sin: 64 | |
// producer.compute_root(): | |
// - Temporary memory allocated: 25 floats | |
// - Loads: 64 | |
// - Stores: 41 | |
// - Calls to sin: 25 |
1. 内联方式,没有中间结果的临时存储,因此临时内存分配为 0;root 调度方式,中间存储了 5*5 的中间结果,所以临时内存分配为 25 floats
2. 内联方式,没有中间结果,所以也不需要 load; root 调度方式,输出的 4*4 个结果,每个都需要读取 4 个中间数据,所以 loads 有 64 次(可能忽略掉了两种方式都需要的、对 Input 数据的加载)
3. 内联方式只存储最终结果 4*4 ;root 调度方式,还需要存储中间结果的 5*5 ,所以是 41.
4. 内联方式,调用 Sin 函数达 64 次;而 root 调度方式,只需要调用 25 次。
两种方法各有优劣,全内联方式使用了最小的临时空间和内存带宽,但是在昂贵的复杂计算上花费了更多时间;root 调度方式正好相反。选择哪一种调度方法,需要取决于目标应用场景到底对什么更敏感,是空间存储,还是计算耗时。
# 每当足够计算一行 y 时,计算 y
全内联和 root 调度两种方式的中间选择。
Func producer("producer_y"), consumer("consumer_y"); | |
producer(x, y) = sin(x * y); | |
consumer(x, y) = (producer(x, y) + | |
producer(x, y + 1) + | |
producer(x + 1, y) + | |
producer(x + 1, y + 1)) / 4; | |
// Tell Halide to evaluate producer as needed per y coordinate of the consumer: | |
producer.compute_at(consumer, y); | |
// Turn on tracing. | |
producer.trace_stores(); | |
consumer.trace_stores(); | |
consumer.realize({4, 4}); |
动图结果:

等效 C 表示:
float result[4][4]; | |
// There's an outer loop over scanlines of consumer: | |
for (int y = 0; y < 4; y++) { | |
// Allocate space and compute enough of the producer to | |
// satisfy this single scanline of the consumer. This | |
// means a 5x2 box of the producer. | |
float producer_storage[2][5]; | |
for (int py = y; py < y + 2; py++) { | |
for (int px = 0; px < 5; px++) { | |
producer_storage[py - y][px] = sin(px * py); | |
} | |
} | |
// Compute a scanline of the consumer. | |
for (int x = 0; x < 4; x++) { | |
result[y][x] = (producer_storage[0][x] + | |
producer_storage[1][x] + | |
producer_storage[0][x + 1] + | |
producer_storage[1][x + 1]) / 4; | |
} | |
} |
compute_at() 指令使得算法先算完能够计算一行 consumer 数据的中间结果,然后算一行 consumer 。下一步计算第二行需要的中间结果,之后计算第二行 consumer 。可以看到计算第二行 Y 所需要的中间结果时,第一行 Y 所需要的中间结果并没有保留,这导致一部分 producer 的重复计算。
该调度形式的表现:
// producer.compute_at(consumer, y): | |
// - Temporary memory allocated: 10 floats | |
// - Loads: 64 | |
// - Stores: 56 | |
// - Calls to sin: 40 |
从上述分析看,所需要的临时存储空间,只有计算一行 Y 所需要的 10 个 floats 。多于全内联方式,但少于 root 调度方式,其复杂计算 sin 的计算次数也在两者之间。
# 存储中间变量
上一小节中只用了 10 个 floats 的临时存储空间,尽管相邻两行 Y 所需要的 Sin 函数有重复,但还是进行了重复计算。
本节通过 producer.store_root() 告知 Halide 存储所有中间变量。
Func producer("producer_root_y"), consumer("consumer_root_y"); | |
producer(x, y) = sin(x * y); | |
consumer(x, y) = (producer(x, y) + | |
producer(x, y + 1) + | |
producer(x + 1, y) + | |
producer(x + 1, y + 1)) / 4; | |
// Tell Halide to make a buffer to store all of producer at | |
// the outermost level: | |
producer.store_root(); | |
// ... but compute it as needed per y coordinate of the consumer. | |
producer.compute_at(consumer, y); | |
producer.trace_stores(); | |
consumer.trace_stores(); | |
printf("\nEvaluating producer.store_root().compute_at(consumer, y)\n"); | |
consumer.realize({4, 4}); |
动图示例:

等效 C 代码:
float result[4][4]; | |
// producer.store_root() implies that storage goes here: | |
float producer_storage[5][5]; | |
// There's an outer loop over scanlines of consumer: | |
for (int y = 0; y < 4; y++) { | |
// Compute enough of the producer to satisfy this scanline | |
// of the consumer. | |
for (int py = y; py < y + 2; py++) { | |
// Skip over rows of producer that we've already | |
// computed in a previous iteration. | |
if (y > 0 && py == y) continue; | |
for (int px = 0; px < 5; px++) { | |
producer_storage[py][px] = sin(px * py); | |
} | |
} | |
// Compute a scanline of the consumer. | |
for (int x = 0; x < 4; x++) { | |
result[y][x] = (producer_storage[y][x] + | |
producer_storage[y + 1][x] + | |
producer_storage[y][x + 1] + | |
producer_storage[y + 1][x + 1]) / 4; | |
} | |
} |
从结果可看, 5*5 的中间结果全部保留下来。最初计算一行 Y,需要 5*2 的 producer 结果,之后每计算 5*1 的 producer 结果,都可以计算一行 Y。重复计算就没有了。
该算法的表现:
// producer.store_root().compute_at(consumer, y): | |
// - Temporary memory allocated: 10 floats | |
// - Loads: 64 | |
// - Stores: 41 | |
// - Calls to sin: 25 |
观察该算法的表现数据,发现 Temporary memory allocated: 10 floats 存在问题,不应该是 25 个吗?
官网解释说:Halide 做了进一步优化,将两行中间中间结果的存储空间做成了环形缓冲区,因此只需要 10 个 floats 的空间。
因此实际的 C 等效代码为:
// Actually store 2 scanlines instead of 5 | |
float producer_storage[2][5]; | |
for (int y = 0; y < 4; y++) { | |
for (int py = y; py < y + 2; py++) { | |
if (y > 0 && py == y) continue; | |
for (int px = 0; px < 5; px++) { | |
// Stores to producer_storage have their y coordinate bit-masked. | |
producer_storage[py & 1][px] = sin(px * py); | |
} | |
} | |
// Compute a scanline of the consumer. | |
for (int x = 0; x < 4; x++) { | |
// Loads from producer_storage have their y coordinate bit-masked. | |
result[y][x] = (producer_storage[y & 1][x] + | |
producer_storage[(y + 1) & 1][x] + | |
producer_storage[y & 1][x + 1] + | |
producer_storage[(y + 1) & 1][x + 1]) / 4; | |
} | |
} | |
} |
# 最外层存储,最内层计算
使用 Halide 的 compute_at(consumer, x) (异于前述的 compute_at(consumer, y) )。那么它的表述意思就是 consumer 每得到能够计算一个结果的中间数据,即计算 consumer 。(异于前述的 compute_at(consumer, y) 为 consumer 每得到能够一行结果的中间数据,即计算 consumer )
Func producer("producer_root_x"), consumer("consumer_root_x"); | |
producer(x, y) = sin(x * y); | |
consumer(x, y) = (producer(x, y) + | |
producer(x, y + 1) + | |
producer(x + 1, y) + | |
producer(x + 1, y + 1)) / 4; | |
// Store outermost, compute innermost. | |
producer.store_root().compute_at(consumer, x); | |
producer.trace_stores(); | |
consumer.trace_stores(); | |
printf("\nEvaluating producer.store_root().compute_at(consumer, x)\n"); | |
consumer.realize({4, 4}); |
动图示例:

等效 C 代码:
float result[4][4]; | |
// producer.store_root() implies that storage goes here, but | |
// we can fold it down into a circular buffer of two | |
// scanlines: | |
float producer_storage[2][5]; | |
// For every pixel of the consumer: | |
for (int y = 0; y < 4; y++) { | |
for (int x = 0; x < 4; x++) { | |
// Compute enough of the producer to satisfy this | |
// pixel of the consumer, but skip values that we've | |
// already computed: | |
if (y == 0 && x == 0) { | |
producer_storage[y & 1][x] = sin(x * y); | |
} | |
if (y == 0) { | |
producer_storage[y & 1][x + 1] = sin((x + 1) * y); | |
} | |
if (x == 0) { | |
producer_storage[(y + 1) & 1][x] = sin(x * (y + 1)); | |
} | |
producer_storage[(y + 1) & 1][x + 1] = sin((x + 1) * (y + 1)); | |
result[y][x] = (producer_storage[y & 1][x] + | |
producer_storage[(y + 1) & 1][x] + | |
producer_storage[y & 1][x + 1] + | |
producer_storage[(y + 1) & 1][x + 1]) / 4; | |
} | |
} |
从动图中可以看到,中间数据的计算顺序和 compute_at(consumer, y) 情况下也不一样了。其首先得到四个中间结果,之后立即计算 producer 。
该算法的表现:
// producer.store_root().compute_at(consumer, x): | |
// - Temporary memory allocated: 10 floats | |
// - Loads: 48 | |
// - Stores: 41 | |
// - Calls to sin: 25 |
当前的算法算是迄今表现最好的。其中 Loads 的操作数为 48,而不是 16*4=64 ,官方文档解释说:每 4 个需要的 producer 值中,有一个可能仍然存在于寄存器中,不会将其视为加载,所以只需要加载其余 3 个,是 16*3=48 。 我也不太懂在运行过程中哪个中间值存在于寄存器中,但结果就是加载 48 次(先接受这个设定好了)。
既然现在的算法是表现最好的,那么为什么不在任何情况下都采用 producer.store_root().compute_at(consumer, x) 呢?原因是并行性,上述把空间利用到极限的情况下,并行就无法实现了。
在前面的两种策略中,我们都假设在之前的迭代中计算的值可以重用。这需要假定先前的 x 或 y 值发生在较早的时间并且已经完成。如果并行化或向量化任一循环,则不成立。如果并行化,Halide 将不会注入那些在 store_at 层和 compute_at 层之间存在并行循环时跳过已经完成的工作的优化,并且也不会将存储折叠到一个循环缓冲区中,这使得我们的 store_root 变得毫无意义。
# 综合应用
前述已经介绍了所有操作,本小节是综合应用。当然还涉及到【Halide】调度优化【1】中的知识。
Func producer("producer_tile"), consumer("consumer_tile"); | |
producer(x, y) = sin(x * y); | |
consumer(x, y) = (producer(x, y) + | |
producer(x, y + 1) + | |
producer(x + 1, y) + | |
producer(x + 1, y + 1)) / 4; | |
// We'll compute 8x8 of the consumer, in 4x4 tiles. | |
Var x_outer, y_outer, x_inner, y_inner; | |
consumer.tile(x, y, x_outer, y_outer, x_inner, y_inner, 4, 4); | |
// Compute the producer per tile of the consumer | |
producer.compute_at(consumer, x_outer); | |
// Notice that I wrote my schedule starting from the end of | |
// the pipeline (the consumer). This is because the schedule | |
// for the producer refers to x_outer, which we introduced | |
// when we tiled the consumer. You can write it in the other | |
// order, but it tends to be harder to read. | |
// Turn on tracing. | |
producer.trace_stores(); | |
consumer.trace_stores(); | |
consumer.realize({8, 8}); |
上述过程切分出 4*4 的小块,然后使用 compute_at(consumer, x_outer) 指令,即获得足够计算 x_outer 的中间数据之后,即开始计算 x_outer 。
动图示例:

等效 C 代码:
float result[8][8]; | |
// For every tile of the consumer: | |
for (int y_outer = 0; y_outer < 2; y_outer++) { | |
for (int x_outer = 0; x_outer < 2; x_outer++) { | |
// Compute the x and y coords of the start of this tile. | |
int x_base = x_outer * 4; | |
int y_base = y_outer * 4; | |
// Compute enough of producer to satisfy this tile. A | |
// 4x4 tile of the consumer requires a 5x5 tile of the | |
// producer. | |
float producer_storage[5][5]; | |
for (int py = y_base; py < y_base + 5; py++) { | |
for (int px = x_base; px < x_base + 5; px++) { | |
producer_storage[py - y_base][px - x_base] = sin(px * py); | |
} | |
} | |
// Compute this tile of the consumer | |
for (int y_inner = 0; y_inner < 4; y_inner++) { | |
for (int x_inner = 0; x_inner < 4; x_inner++) { | |
int x = x_base + x_inner; | |
int y = y_base + y_inner; | |
result[y][x] = | |
(producer_storage[y - y_base][x - x_base] + | |
producer_storage[y - y_base + 1][x - x_base] + | |
producer_storage[y - y_base][x - x_base + 1] + | |
producer_storage[y - y_base + 1][x - x_base + 1]) / 4; | |
} | |
} | |
} | |
} |
# 最后
找到优秀的调度方法很难。我们最终在内存带宽、冗余工作和并行性之间进行了三方面的权衡。Halide 不能自动为您做出正确的选择(抱歉)。相反,它试图使您更容易探索各种选项,而不会弄乱您的程序。实际上,Halide 承诺像 compute_root 这样的调度调用不会改变你的算法的含义 —— 无论你如何调度,你都应该得到相同的结果。
Halide 不仅仅是矢量化和并行化你的代码。这不足以让你走得很远。Halide 为您提供了一些工具,帮助您快速探索局部性、冗余工作和并行性之间的不同权衡,而不会弄乱您试图计算的实际结果。
# 后记
本博客目前以及可预期的将来都不会支持评论功能。各位大侠如若有指教和问题,可以在我的 github 项目 或随便一个项目下提出 issue,并指明哪一篇博客,我看到一定及时回复!