writer->thread = NULL;
writer->command = command;
- writer->started = xbt_os_sem_init(0);
+ writer->written = xbt_os_sem_init(0);
+ writer->can_write = xbt_os_sem_init(0);
+
+ writer->done = 0;
return writer;
}
void
writer_free(writer_t* writer)
{
+
+ /*xbt_os_sem_destroy((*writer)->started);
+ xbt_os_sem_destroy((*writer)->can_write);*/
+
free(*writer);
*writer = NULL;
}
{
writer_t writer = (writer_t)p;
command_t command = writer->command;
- long number_of_bytes_to_write = command->context->input->used;
+ int number_of_bytes_to_write = command->context->input->used;
char* input = (char*)(command->context->input->data);
int got;
+ int released = 0;
+
+
+ xbt_os_sem_acquire(writer->can_write);
- xbt_os_sem_release(writer->started);
while(!command->failed && !command->interrupted && !command->successeded && number_of_bytes_to_write > 0)
{
- got = number_of_bytes_to_write > SSIZE_MAX ? SSIZE_MAX : number_of_bytes_to_write;
- got = write( writer->command->stdin_fd, input, got );
-
+ got = number_of_bytes_to_write > PIPE_BUF ? PIPE_BUF : number_of_bytes_to_write;
+ got = write(writer->command->stdin_fd, input, got );
+
if(got < 0)
{
- if(EINTR == errno || EAGAIN == errno)
- {
+ if(EINTR == errno)
+ continue;
+
+ else if(EAGAIN == errno)
+ {/* the pipe is full */
+ if(!released)
+ {
+ xbt_os_sem_release(writer->written);
+ released = 1;
+ }
+
continue;
}
else if(EPIPE == errno)
input += got;
if(got == 0)
- usleep(100);
+ xbt_os_thread_yield();
+
+ }
+
+ if(!released)
+ {
+ xbt_os_sem_release(writer->written);
+ released = 1;
}
+
+ close(command->stdin_fd);
+ command->stdin_fd = INDEFINITE_FD;
+
command->context->input->data[0]='\0';
command->context->input->used=0;
command_handle_failure(command, csr_write_pipe_broken);
}
-
- close(command->stdin_fd);
- command->stdin_fd = INDEFINITE_FD;
+ writer->done = 1;
return NULL;