Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add new entry in Release_Notes.
[simgrid.git] / src / plugins / jbod.cpp
1 /* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/plugins/jbod.hpp>
7 #include <simgrid/s4u/Comm.hpp>
8 #include <simgrid/s4u/Disk.hpp>
9 #include <simgrid/s4u/Exec.hpp>
10 #include <simgrid/s4u/NetZone.hpp>
11
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_jbod, s4u, "Logging specific to the JBOD implmentation");
14
15 namespace simgrid::plugin {
16
17 JbodPtr Jbod::create_jbod(s4u::NetZone* zone, const std::string& name, double speed, unsigned int num_disks,
18                           RAID raid_level, double read_bandwidth, double write_bandwidth)
19 {
20   xbt_assert(not ((raid_level == RAID::RAID4 || raid_level == RAID::RAID5) && num_disks < 3),
21              "RAID%d requires at least 3 disks", (int) raid_level);
22   xbt_assert(not (raid_level == RAID::RAID6 && num_disks < 4), "RAID6 requires at least 4 disks");
23
24   auto* jbod = new Jbod();
25   jbod->set_controller(zone->create_host(name, speed));
26   jbod->set_num_disks(num_disks);
27   jbod->set_parity_disk_idx(num_disks -1 );
28   jbod->set_read_disk_idx(-1);
29   jbod->set_raid_level(raid_level);
30   for (unsigned int i = 0; i < num_disks; i++)
31     jbod->get_controller()->create_disk(name + "_disk_" + std::to_string(i), read_bandwidth, write_bandwidth);
32
33   return JbodPtr(jbod, false);
34 }
35
36 JbodIoPtr Jbod::read_async(sg_size_t size)
37 {
38   auto comm = s4u::Comm::sendto_init()->set_source(this->controller_)->set_payload_size(size);
39   std::vector<s4u::IoPtr> pending_ios;
40   sg_size_t read_size = 0;
41   std::vector<s4u::Disk*> targets;
42   switch(raid_level_) {
43     case RAID::RAID0:
44       read_size = size / num_disks_;
45       targets = controller_->get_disks();
46       break;
47     case RAID::RAID1:
48       read_size = size;
49       targets.push_back(controller_->get_disks().at(get_next_read_disk_idx()));
50       break;
51     case RAID::RAID4:
52       read_size = size / (num_disks_ - 1);
53       targets = controller_->get_disks();
54       targets.pop_back();
55       break;
56     case RAID::RAID5:
57       read_size = size / (num_disks_ - 1);
58       targets = controller_->get_disks();
59       targets.erase(targets.begin() + (get_parity_disk_idx() + 1 % num_disks_));
60       break;
61     case RAID::RAID6:
62       read_size = size / (num_disks_ - 2);
63       targets = controller_->get_disks();
64       if ( (get_parity_disk_idx() + 2 % num_disks_) == 0 ) {
65         targets.pop_back();
66         targets.erase(targets.begin());
67       } else if (get_parity_disk_idx() + 1 == static_cast<int>(num_disks_)) {
68         targets.pop_back();
69         targets.pop_back();
70       } else {
71         targets.erase(targets.begin() + (get_parity_disk_idx() + 1) % num_disks_,
72                       targets.begin() + get_parity_disk_idx() + 3);
73       }
74       break;
75     default:
76       xbt_die("Unsupported RAID level. Supported level are: 0, 1, 4, 5, and 6");
77   }
78   for (const auto* disk : targets) {
79     auto io = s4u::IoPtr(disk->io_init(read_size, s4u::Io::OpType::READ));
80     io->set_name(disk->get_name())->start();
81     pending_ios.push_back(io);
82   }
83
84   return JbodIoPtr(new JbodIo(this, comm, nullptr, pending_ios, s4u::Io::OpType::READ));
85 }
86
87 sg_size_t Jbod::read(sg_size_t size)
88 {
89   read_async(size)->wait();
90   return size;
91 }
92
93 JbodIoPtr Jbod::write_async(sg_size_t size)
94 {
95   auto comm = s4u::Comm::sendto_init(s4u::Host::current(), this->get_controller());
96   std::vector<s4u::IoPtr> pending_ios;
97   sg_size_t write_size = 0;
98   switch(raid_level_) {
99     case RAID::RAID0:
100       write_size = size / num_disks_;
101       break;
102     case RAID::RAID1:
103       write_size = size;
104       break;
105     case RAID::RAID4:
106       write_size = size / (num_disks_ - 1);
107       break;
108     case RAID::RAID5:
109       update_parity_disk_idx();
110       write_size = size / (num_disks_ - 1);
111       break;
112     case RAID::RAID6:
113       update_parity_disk_idx();
114       update_parity_disk_idx();
115       write_size = size / (num_disks_ - 2);
116       break;
117     default:
118       xbt_die("Unsupported RAID level. Supported level are: 0, 1, 4, 5, and 6");
119   }
120   for (const auto* disk : get_controller()->get_disks()) {
121     auto io = s4u::IoPtr(disk->io_init(write_size, s4u::Io::OpType::WRITE));
122     io->set_name(disk->get_name());
123     pending_ios.push_back(io);
124   }
125
126   s4u::ExecPtr parity_block_comp = nullptr;
127   if (raid_level_ == RAID::RAID4 || raid_level_ == RAID::RAID5 || raid_level_ == RAID::RAID6) {
128     // Assume 1 flop per byte to write per parity block and two for RAID6.
129     // Do not assign the Exec yet, will be done after the completion of the CommPtr
130     if (raid_level_ == RAID::RAID6)
131       parity_block_comp = s4u::Exec::init()->set_flops_amount(200 * write_size);
132     else
133       parity_block_comp = s4u::Exec::init()->set_flops_amount(write_size);
134   }
135
136   comm->set_payload_size(size)->start();
137   return JbodIoPtr(new JbodIo(this, comm, parity_block_comp, pending_ios, s4u::Io::OpType::WRITE));
138 }
139
140 sg_size_t Jbod::write(sg_size_t size)
141 {
142   write_async(size)->wait();
143   return size;
144 }
145
146 void JbodIo::wait()
147 {
148   if (type_ == s4u::Io::OpType::WRITE) {
149     transfer_->wait();
150     XBT_DEBUG("Data received on JBOD");
151     if (parity_block_comp_) {
152       parity_block_comp_->set_host(jbod_->get_controller())->wait();
153       XBT_DEBUG("Parity block computed");
154     }
155     XBT_DEBUG("Start writing");
156     for (const auto& io : pending_ios_)
157       io->start();
158   }
159
160   for (const auto& io : pending_ios_) {
161     XBT_DEBUG("Wait for I/O on %s", io->get_cname());
162     io->wait();
163   }
164
165   if (type_ == s4u::Io::OpType::READ) {
166     XBT_DEBUG("Data read on JBOD, send it to %s", s4u::Host::current()->get_cname());
167     transfer_->set_destination(s4u::Host::current())->wait();
168   }
169 }
170 } // namespace simgrid::plugin