Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lower stream-parallelized LinearOp into Host IR AG+GEMM overlap algo #3736

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions csrc/host_ir/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,35 @@ void HostIrEvaluator::handle(MatmulOp* matmul) {
}
}

void HostIrEvaluator::handle(LinearOp* linear) {
TensorView* in = linear->inA()->as<TensorView>();
TensorView* weight = linear->inB()->as<TensorView>();
TensorView* bias = linear->bias()->as<TensorView>();
TensorView* out = linear->out()->as<TensorView>();
NVF_ERROR(
expr_evaluator_.isKnown(in) && expr_evaluator_.isKnown(weight) &&
(!linear->has_bias() || expr_evaluator_.isKnown(bias)),
"Inputs of the Linear Op ",
linear->toString(),
"must be precomputed before being retrieved");

if (!expr_evaluator_.isKnown(out)) {
unhandled(linear);
return;
}

auto in_at = expr_evaluator_.evaluate(in).as<at::Tensor>();
auto weight_at = expr_evaluator_.evaluate(weight).as<at::Tensor>();
auto bias_at = expr_evaluator_.evaluate(bias).as<at::Tensor>();
auto out_at = expr_evaluator_.evaluate(out).as<at::Tensor>();

if (linear->has_bias()) {
at::linear_out(out_at, in_at, weight_at.squeeze(), bias_at.squeeze());
} else {
at::linear_out(out_at, in_at, weight_at.squeeze());
}
}

void HostIrEvaluator::handle(kir::Allocate* allocate) {
NVF_ERROR(
allocate->buffer()->isA<TensorView>(),
Expand Down
1 change: 1 addition & 0 deletions csrc/host_ir/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class HostIrEvaluator final : public OptOutDispatch {
void handle(EndCoalescing* end_coalescing) override;
void handle(kir::IfThenElse* if_then_else) override;
void handle(MatmulOp* matmul) override;
void handle(LinearOp* linear) override;
void handle(kir::Allocate* allocate) override;
void unhandled(Statement* stmt) override;

Expand Down
86 changes: 67 additions & 19 deletions csrc/host_ir/lower.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ void lowerToReduceScatter(
std::vector<Expr*> HostIrLower::lower(Expr* c) {
FusionGuard fg(c->fusion());

if (c->isA<MatmulOp>()) {
if (c->isOneOf<MatmulOp, LinearOp>()) {
return lowerToCollectiveBasedPipelinedGemmComm(c);
}

Expand Down Expand Up @@ -342,30 +342,71 @@ bool HostIrLower::canLower(Expr* expr, bool ignore_inner_resharding) {
}
return ldst->as<LoadStoreOp>()->opType() == LoadStoreOpType::Set;
} else if (auto* matmul = dynamic_cast<MatmulOp*>(expr)) {
// For now we only support c = matmul(a,b) when b,c are fully replicated and
// a is sharded on axis 1
// For now we only support out = matmul(a,b) when b, out are fully
// replicated, a is sharded on axis 1, and out i stream-parallelized on axis
// 0.
return !isSharded(matmul->inB()) && !isSharded(matmul->out()) &&
matmul->inA()->axis(0)->getParallelType() == ParallelType::Serial &&
getShardedLogicalAxis(matmul->inA(), ParallelType::DIDx) == 1 &&
matmul->out()->axis(0)->getParallelType() == ParallelType::Stream;
} else if (auto* linear = dynamic_cast<LinearOp*>(expr)) {
// For now we only support out = linear(a, b, bias) when b, bias, and out
// are fully replicated, a is sharded on axis 1, and out i
// stream-parallelized on axis 0.
auto* a = linear->inA()->as<TensorView>();
auto* b = linear->inB()->as<TensorView>();
auto* bias = linear->bias()->as<TensorView>();
;
auto* out = linear->out()->as<TensorView>();
;
return !isSharded(b) && !(linear->has_bias() && isSharded(bias)) &&
!isSharded(out) &&
a->axis(0)->getParallelType() == ParallelType::Serial &&
getShardedLogicalAxis(a, ParallelType::DIDx) == 1 &&
out->axis(0)->getParallelType() == ParallelType::Stream;
}
return false;
}

std::vector<Expr*> HostIrLower::lowerToCollectiveBasedPipelinedGemmComm(
Expr* expr) {
auto matmul = expr->as<MatmulOp>();
NVF_ERROR(matmul != nullptr, "Expect a MatmulOp, got", expr);
TensorView* tva = matmul->inA();
TensorView* tvb = matmul->inB();
TensorView* tvc = matmul->out();
NVF_ERROR(
(expr->isOneOf<MatmulOp, LinearOp>()),
"Expect a MatmulOp or a LinearOp, but got",
expr);
TensorView* tva = nullptr;
TensorView* tvb = nullptr;
TensorView* tv_bias = nullptr;
TensorView* tv_out = nullptr;
if (auto* matmul = dynamic_cast<MatmulOp*>(expr)) {
tva = matmul->inA();
tvb = matmul->inB();
tv_out = matmul->out();
} else {
auto* linear = dynamic_cast<LinearOp*>(expr);
tva = linear->inA()->as<TensorView>();
tvb = linear->inB()->as<TensorView>();
tv_bias = linear->bias()->as<TensorView>();
tv_out = linear->out()->as<TensorView>();
NVF_ERROR(
!(linear->has_bias() && isSharded(tv_bias)),
"The bias ",
tv_bias,
" is expected to not be sharded");
}

NVF_ERROR(
!isSharded(tvb), "The B operand ", tvb, " is expected to not be sharded");
NVF_ERROR(
!isSharded(tvc),
!isSharded(tv_out),
"The output ",
matmul->out(),
tv_out,
" is expected to not be sharded");
NVF_ERROR(
tv_out->axis(0)->getParallelType() == ParallelType::Stream,
"The output ",
tv_out,
" is expected to be stream-parallelized on axis 0");
const int64_t sharded_axis_index =
getShardedLogicalAxis(tva, ParallelType::DIDx);
IterDomain* stream_axis = tva->axis(0);
Expand All @@ -388,9 +429,9 @@ std::vector<Expr*> HostIrLower::lowerToCollectiveBasedPipelinedGemmComm(
auto* allocate_tva_allgathered =
IrBuilder::create<kir::Allocate>(tva_allgathered, MemoryType::Global);

tvc->setMemoryType(MemoryType::Global);
auto* allocate_tvc =
IrBuilder::create<kir::Allocate>(tvc, MemoryType::Global);
tv_out->setMemoryType(MemoryType::Global);
auto* allocate_tv_out =
IrBuilder::create<kir::Allocate>(tv_out, MemoryType::Global);

auto* j =
IrBuilder::create<Val>(DataType::Index); // running index of the for-loop
Expand All @@ -417,14 +458,14 @@ std::vector<Expr*> HostIrLower::lowerToCollectiveBasedPipelinedGemmComm(

TensorView* tva_j = select(tva, 0, j);
TensorView* tva_allgathered_j = select(tva_allgathered, 0, j);
TensorView* tvc_j = select(tvc, 0, j);
TensorView* tv_out_j = select(tv_out, 0, j);

NVF_ERROR(
tva->hasDeviceMesh(),
"The matmul's input ",
tva,
"is expected to have a DeviceMesh");
for (auto tv : {tva_j, tva_allgathered_j, tvc_j}) {
for (auto tv : {tva_j, tva_allgathered_j, tv_out_j}) {
tv->setDeviceMesh(tva->getDeviceMesh());
}

Expand All @@ -435,7 +476,13 @@ std::vector<Expr*> HostIrLower::lowerToCollectiveBasedPipelinedGemmComm(
/*team=*/tva->getDeviceMesh().vector());
auto* wait = IrBuilder::create<hir::Wait>(communication);

auto* mm = IrBuilder::create<MatmulOp>(tvc_j, tva_allgathered_j, tvb);
Expr* compute = nullptr;
if (expr->isA<MatmulOp>()) {
compute = IrBuilder::create<MatmulOp>(tv_out_j, tva_allgathered_j, tvb);
} else {
compute =
IrBuilder::create<LinearOp>(tv_out_j, tva_allgathered_j, tvb, tv_bias);
}

auto* set_back_original_stream =
IrBuilder::create<hir::SetCurrentStream>(original_stream);
Expand All @@ -447,15 +494,16 @@ std::vector<Expr*> HostIrLower::lowerToCollectiveBasedPipelinedGemmComm(
tva_allgathered_j->definition(),
communication,
wait,
tvc_j->definition(),
mm,
tv_out_j->definition(),
compute,
set_back_original_stream,
sync_stream};
for (Expr* expr : loop_body) {
for_loop->body().push_back(expr);
}

return {get_current_stream, allocate_tva_allgathered, allocate_tvc, for_loop};
return {
get_current_stream, allocate_tva_allgathered, allocate_tv_out, for_loop};
}

std::unique_ptr<hir::HostIrContainer> HostIrLower::lower(
Expand Down
1 change: 0 additions & 1 deletion csrc/ir/internal_nodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -2269,7 +2269,6 @@ class LinearOp : public Expr {
const ExpressionEvaluator& ee,
const std::vector<PolymorphicValue>& inputs) const override;

private:
bool has_bias() const {
return inputs().size() == 3;
}
Expand Down
86 changes: 86 additions & 0 deletions tests/cpp/test_host_irs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,92 @@ TEST_F(MatmulHostIrTest, HostIrMatmulOut) {
EXPECT_TRUE(ref_output.allclose(c_tensor));
}

using LinearHostIrTest = NVFuserTest;

TEST_F(LinearHostIrTest, HostIr) {
constexpr int64_t B = 32;
constexpr int64_t M = 64;
constexpr int64_t K = 128;
constexpr int64_t N = 256;

auto hic = std::make_unique<HostIrContainer>();
FusionGuard fg(hic.get());

TensorView* in = makeContigTensor(3);
TensorView* weight = makeContigTensor(2);
TensorView* bias = makeContigTensor(1);
TensorView* out = linear(in, weight, bias);

hic->addInput(in);
hic->addInput(weight);
hic->addInput(bias);
hic->addOutput(out);

hic->pushBackTopLevelExprs(out->definition());

HostIrEvaluator hie(std::move(hic));

auto options = at::TensorOptions().device(at::kCUDA, 0).dtype(torch::kFloat);
at::Tensor in_at = at::randn({B, M, K}, options);
at::Tensor weight_at = at::randn({N, K}, options);
at::Tensor bias_at = at::randn({N}, options);
std::unordered_map<Val*, c10::IValue> concrete_input_buffers = {
{hie.inputs().at(0), in_at},
{hie.inputs().at(1), weight_at},
{hie.inputs().at(2), bias_at}};

auto output = hie.runWithInput(concrete_input_buffers).at(0);

// validate
auto ref_output = at::linear(in_at, weight_at, bias_at);

EXPECT_TRUE(ref_output.allclose(output));
}

TEST_F(LinearHostIrTest, HostIrLinearOut) {
constexpr int64_t B = 32;
constexpr int64_t M = 64;
constexpr int64_t K = 128;
constexpr int64_t N = 256;

auto hic = std::make_unique<HostIrContainer>();
FusionGuard fg(hic.get());

TensorView* in = makeContigTensor(3);
TensorView* weight = makeContigTensor(2);
TensorView* bias = makeContigTensor(1);
TensorView* out = makeContigTensor(3);

auto linear_op = IrBuilder::create<LinearOp>(out, in, weight, bias);

hic->addInput(in);
hic->addInput(weight);
hic->addInput(bias);
hic->addInput(out);

hic->pushBackTopLevelExprs(linear_op);

HostIrEvaluator hie(std::move(hic));

auto options = at::TensorOptions().device(at::kCUDA, 0).dtype(torch::kFloat);
at::Tensor in_at = at::randn({B, M, K}, options);
at::Tensor weight_at = at::randn({N, K}, options);
at::Tensor bias_at = at::randn({N}, options);
at::Tensor out_at = at::empty({B, M, N}, options);
std::unordered_map<Val*, c10::IValue> concrete_input_buffers = {
{hie.inputs().at(0), in_at},
{hie.inputs().at(1), weight_at},
{hie.inputs().at(2), bias_at},
{hie.inputs().at(3), out_at}};

hie.runWithInput(concrete_input_buffers);

// validate
auto ref_output = at::linear(in_at, weight_at, bias_at);

EXPECT_TRUE(ref_output.allclose(out_at));
}

using SelectHostIrTestParams = bool;
using SelectHostIrTest = NVFuserFixtureParamTest<SelectHostIrTestParams>;

Expand Down
61 changes: 61 additions & 0 deletions tests/cpp/test_multidevice_host_ir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,67 @@ TEST_F(OverlapDistributedMatmulTest, AG_matmul) {
EXPECT_TRUE(torch::allclose(tc_ref, tc, 1e-2, 1e-2));
}

TEST_F(OverlapDistributedMatmulTest, AG_linear) {
constexpr int64_t M = 32768;
constexpr int64_t K = 32768;
constexpr int64_t N = 1024;
constexpr int64_t S = 8;
const int64_t D = communicator_->size();
if (M % (D * S) != 0) {
GTEST_SKIP() << "M must be a multiple of D * S, but got M = " << M
<< ", D = " << D << ", S = " << S;
}

auto fusion = std::make_unique<Fusion>();
FusionGuard fg(fusion.get());

TensorView* in = makeContigTensor(4); //[S, DIDx(D), M/(S*D), K]
TensorView* weight = makeContigTensor(2); //[N, K]
TensorView* bias = makeContigTensor(1); //[N]
TensorView* out = linear(in, weight, bias); //[S, D, M/(S*D), N]

fusion->addInput(in);
fusion->addInput(weight);
fusion->addInput(bias);
fusion->addOutput(out);

auto mesh = DeviceMesh::createForNumDevices(D);
in->setDeviceMesh(mesh);
weight->setDeviceMesh(mesh);
bias->setDeviceMesh(mesh);
out->setDeviceMesh(mesh);

in->axis(1)->parallelize(ParallelType::DIDx);
out->axis(0)->parallelize(ParallelType::Stream);

MultiDeviceExecutor executor(std::move(fusion), *communicator_);

auto tensor_options =
at::TensorOptions().dtype(at::kFloat).device(communicator_->device());
at::Tensor in_at_unsharded =
at::randn({S, D, M / (S * D), K}, tensor_options);
at::Tensor in_at = in_at_unsharded.slice(
1, communicator_->deviceId(), communicator_->deviceId() + 1);
at::Tensor weight_at = at::randn({N, K}, tensor_options);
at::Tensor bias_at = at::randn({N}, tensor_options);
at::Tensor out_ref = at::linear(in_at_unsharded, weight_at, bias_at);

std::vector<c10::IValue> inputs = {in_at, weight_at, bias_at};
at::Tensor out_at;

constexpr int64_t kNumberOfIterations = 20;
constexpr int64_t kNumberOfWarmupIterations = 5;
for (auto i : c10::irange(kNumberOfIterations)) {
if (i == kNumberOfWarmupIterations) {
cudaProfilerStart();
}
out_at = executor.runWithInput(inputs).at(0);
}
cudaProfilerStop();

EXPECT_TRUE(torch::allclose(out_ref, out_at, 1e-2, 1e-2));
}

} // namespace hir

} // namespace nvfuser
Loading