Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
last version of tesh
[simgrid.git] / tools / tesh2 / src / writer.c
index 97aa2dc..c3ba975 100644 (file)
@@ -13,7 +13,10 @@ writer_new(command_t command)
 
        writer->thread = NULL;
        writer->command = command;
-       writer->started = xbt_os_sem_init(0);
+       writer->written = xbt_os_sem_init(0);
+       writer->can_write = xbt_os_sem_init(0);
+       
+       writer->done = 0;
 
        return writer;
 }
@@ -21,6 +24,10 @@ writer_new(command_t command)
 void
 writer_free(writer_t* writer)
 {
+       
+       /*xbt_os_sem_destroy((*writer)->started);
+       xbt_os_sem_destroy((*writer)->can_write);*/
+       
        free(*writer);
        *writer = NULL;
 }
@@ -145,21 +152,33 @@ writer_start_routine(void* p)
 {
        writer_t writer = (writer_t)p;
        command_t command = writer->command;
-       long number_of_bytes_to_write = command->context->input->used;
+       int number_of_bytes_to_write = command->context->input->used;
        char* input = (char*)(command->context->input->data);
        int got;
+       int released = 0;
+       
+       
+       xbt_os_sem_acquire(writer->can_write);
        
-       xbt_os_sem_release(writer->started);
        
        while(!command->failed && !command->interrupted && !command->successeded && number_of_bytes_to_write > 0)
        {
-               got = number_of_bytes_to_write > SSIZE_MAX ? SSIZE_MAX : number_of_bytes_to_write;
-               got = write( writer->command->stdin_fd, input, got );
-                       
+               got = number_of_bytes_to_write > PIPE_BUF ? PIPE_BUF : number_of_bytes_to_write;
+               got = write(writer->command->stdin_fd, input, got );
+               
                if(got < 0) 
                {
-                       if(EINTR == errno || EAGAIN == errno)
-                       {
+                       if(EINTR == errno)
+                               continue;
+                               
+                       else if(EAGAIN == errno)
+                       {/* the pipe is full */
+                               if(!released)
+                               {
+                                       xbt_os_sem_release(writer->written);
+                                       released = 1;
+                               }
+                               
                                continue;
                        }
                        else if(EPIPE == errno) 
@@ -179,9 +198,20 @@ writer_start_routine(void* p)
                input += got;
                
                if(got == 0)
-                       usleep(100);
+                       xbt_os_thread_yield();
+               
+       }
+       
+       if(!released)
+       {
+               xbt_os_sem_release(writer->written);
+               released = 1;
        }
        
+
+       close(command->stdin_fd);
+       command->stdin_fd = INDEFINITE_FD;
+       
        command->context->input->data[0]='\0';
        command->context->input->used=0;
        
@@ -198,9 +228,7 @@ writer_start_routine(void* p)
                command_handle_failure(command, csr_write_pipe_broken);
        }
        
-       
-       close(command->stdin_fd);
-       command->stdin_fd = INDEFINITE_FD;
+       writer->done = 1;
        
        return NULL;