1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
 herr_t
 ss_blob_write(ss_blob_t *blob,                  /* The blob for which data is written, which must be bound to both memory
                                                  * and a dataset. Any task that is part of the file communicator but not the
                                                  * scope communicator is participating soley for the sake of collectivity and
                                                  * should pass the blob's top scope here instead. */
               hid_t iospace,                    /* This is an optional hyperslab describing the part of the blob that is to
                                                  * be written. The extent and selection are relative to the portion of the
                                                  * dataset owned by the blob and described in some previous call to
                                                  * ss_blob_bind_f() (perhaps in an earlier execution). If not specified then
                                                  * all of the blob's data is written. */
               unsigned flags,                   /* Various bit flags commonly passed to this function. */
               ss_prop_t UNUSED *props           /* See [Blob Properties]. (Unused at this time.) */
               )
 {
     SS_ENTER(ss_blob_write, herr_t);
     ss_scope_t          topscope=SS_SCOPE_NULL; /* The top scope for BLOB */
     ss_scope_t          blobscope=SS_SCOPE_NULL;/* Scope that contains BLOB */
     ss_gblob_t          *gblob=NULL;            /* Global blob table from the file */
     size_t              d_idx=SS_NOSIZE;        /* Index into gblob->d table */
     void                *buffer=NULL;           /* Source buffer that was bound to the blob */
     hid_t               iom=-1, iof=-1;         /* Data spaces for memory and file -- extent and selections */
     hid_t               dxpl=-1;                /* Dataset transfer property list -- not copied here so do not close */
     hid_t               mtype=-1;               /* Memory datatype saved after unbind */
     herr_t              status;                 /* Return status of called functions */
     void                *conv_buf=NULL;         /* Conversion buffer */
     int                 blobtask=-1;            /* The rank of the calling task in the blob communicator */
     ss_gfile_t          *gfile=NULL;            /* The file descriptor for the top scope */
     static ss_prop_t    *syncprops=NULL;        /* Synchronization properties */
     MPI_Comm            blobcomm=SS_COMM_NULL;  /* Communicator for the blob's scope */

 #ifdef HAVE_PARALLEL
     int                 ndims;                  /* Dimensionality */
     ss_blob_stride_t    iom_stride;             /* Stride description of memory buffer */
     ss_blob_stride_t    conv_stride;            /* Stride description of conversion buffer */
     hsize_t             buf_size[H5S_MAX_RANK]; /* Size of memory buffer */
     hsize_t             slab_start[H5S_MAX_RANK];/* Starting address of slab in buffer */
     hsize_t             slab_count[H5S_MAX_RANK];/* Size of the slab for I/O */
     hsize_t             nelmts_h;               /* Bigger, temporary version of `nelmts' */
     size_t              nelmts;                 /* Total number of elements being written */
     size_t              max_tsize;              /* Maximum of memory and file datatype sizes */
 #endif

     if (flags & SS_ALLSAME) flags |= SS_BLOB_COLLECTIVE;
     if (!ss_mpi_extras((ss_pers_t**)&blob, &topscope)) SS_ERROR(FAILED);
     gfile = SS_GFILE_LINK(&topscope);
     gblob = gfile->gblob;

     /* Create synchronization property list */
     if (!syncprops) {
         if (NULL==(syncprops=ss_prop_new("blob sync props"))) SS_ERROR(FAILED);
         if (ss_prop_add(syncprops, "dset", H5T_NATIVE_HID, NULL)<0) SS_ERROR(FAILED);
     }

     if (blob) {
         /* Get blob's scope, communicator, and MPI task number in that communicator */
         if (NULL==ss_pers_scope((ss_pers_t*)blob, &blobscope)) SS_ERROR(FAILED);
         if (ss_scope_comm(&blobscope, &blobcomm, &blobtask, NULL)<0) SS_ERROR(FAILED);

         /* Verify data space consistency to the extent possible without actually calling H5Dwrite(). This also generates the
          * dataset-dimension data spaces for memory and file. We have to do this before unbinding the memory but we still want
          * to unbind it if necessary. So the error return is delayed until after the unbinding. */
         if ((status = ss_blob_ckspaces(blob, iospace, &iom, &iof))<0) SS_SAVE;

         /* Get the source buffer at this early stage so we can unbind before most errors if so requested. */
         if (NULL==(buffer=SS_BLOB(blob)->m.mem)) SS_ERROR_FMT(USAGE, ("blob is not bound to memory"));

         /* Unbind memory */
         if (flags & SS_BLOB_UNBIND) {
             SS_BLOB(blob)->m.mem = NULL;
             mtype = SS_BLOB(blob)->m.mtype; /*will be closed at end*/
             SS_BLOB(blob)->m.mtype = 0;
             if (H5Sclose(SS_BLOB(blob)->m.mspace)<0) SS_ERROR(HDF5);
             SS_BLOB(blob)->m.mspace = 0;
         } else {
             if ((mtype = H5Tcopy(SS_BLOB(blob)->m.mtype))<0) SS_ERROR(HDF5);
         }

         /* Now that we've had an opportunity to unbind, raise the ss_blob_ckspaces() error if there was one. */
         if (status<0) {
             SS_REFAIL;
             SS_ERROR(FAILED);
         }

         if ((flags & SS_BLOB_FREE) && !(flags & SS_BLOB_UNBIND)) SS_ERROR_FMT(USAGE, ("SS_BLOB_FREE requires SS_BLOB_UNBIND"));

         /* Make sure the blob is bound to a dataset. Perhaps this can be relaxed in the future, making it symmetric with
          * ss_blob_read() which can operate when the blob is not bound to memory. */
         if (!SS_BLOB(blob)->dsetaddr) SS_ERROR_FMT(USAGE, ("blob is not bound to a dataset"));
         if (SS_NOSIZE==(d_idx=ss_blob_didx(blob))) SS_ERROR(FAILED);
         SS_ASSERT(SS_BLOB(blob)->dsetaddr==*((haddr_t*)(&gblob->d[d_idx].stat.objno)[0]));
         SS_ASSERT(gblob->d[d_idx].dset>0);

 #ifdef HAVE_PARALLEL
         /* Synchronous or asynchronous two-phase I/O. We can use two-phase I/O if it is enabled and either the SS_BLOB_ASYNC
          * or SS_BLOB_COLLECTIVE flags are set. However, if the SS_ALLSAME flag is set then we fall back to the basic
          * H5Dwrite() because two-phase I/O has not been optimized for that case: the basic H5Dwrite() is probably faster. */
         if (gblob->agg.maxaggtasks>0 &&
             (flags & (SS_BLOB_ASYNC|SS_BLOB_COLLECTIVE)) &&
             0==(flags & SS_ALLSAME)) {
             if ((nelmts_h=H5Sget_select_npoints(iom))>0) {
                 SS_ASSERT(nelmts_h<(hsize_t)SS_NOSIZE); /*because we cast it below*/
                 nelmts = (size_t)nelmts_h;
                 if ((flags & SS_BLOB_COPY) || !H5Tequal(mtype, gblob->d[d_idx].dtype)) {
                     max_tsize = MAX(H5Tget_size(mtype), H5Tget_size(gblob->d[d_idx].dtype));
                     if (NULL==(conv_buf=malloc(max_tsize*nelmts))) SS_ERROR(RESOURCE);
                     ss_blob_sendbuf_total_g++; /*total number of buffers allocated; not bytes!*/
                     if ((ndims = ss_blob_ckspace(iom, H5S_MAX_RANK, buf_size, slab_start, slab_count, NULL))<0) SS_ERROR(FAILED);
                     SS_ASSERT(ndims>0); /*probably doesn't work for scalar? [rpm 2004-06-23]*/
                     if (ss_blob_stride(ndims, buf_size, slab_start, slab_count, &iom_stride)<0) SS_ERROR(FAILED);
                     if (ss_blob_stride_1((hsize_t)0, (hsize_t)nelmts, &conv_stride)<0) SS_ERROR(FAILED);
                     if (ss_blob_stride_copy(conv_buf, &conv_stride, buffer, &iom_stride, H5Tget_size(mtype))<0) SS_ERROR(FAILED);
                     if (H5Tconvert(mtype, gblob->d[d_idx].dtype, nelmts, conv_buf, NULL, H5P_DEFAULT)<0) SS_ERROR(FAILED);
                     if (flags & SS_BLOB_FREE) SS_FREE(buffer);
                     buffer = conv_buf;
                     flags |= SS_BLOB_FREE;
                     if (H5Sclose(iom)<0) SS_ERROR(HDF5);
                     if ((iom = H5Screate_simple(ndims, slab_count, slab_count))<0) SS_ERROR(HDF5);
                 }
                 if (ss_blob_async_append(gblob, d_idx, mtype, iom, iof, buffer, flags)<0) SS_ERROR(FAILED);
                 mtype = iom = iof = -1;
             }

             /* If the ASYNC flag is not set (then the COLLECTIVE flag _must_ be set) then we must block for the two-phase
              * I/O operation to complete: both data shipping and the H5Dwrite() call. */
             if (0==(flags & SS_BLOB_ASYNC)) {
                 SS_ASSERT(flags & SS_BLOB_COLLECTIVE);
                 if (ss_prop_set(syncprops, "dset", H5T_NATIVE_HID, &(gblob->d[d_idx].dset))<0) SS_ERROR(FAILED);
                 if (ss_blob_synchronize(&topscope, syncprops)<0) SS_ERROR(FAILED); /*initiate data shipping*/
                 if (ss_blob_async_flush(gblob, d_idx, gfile->dxpl_independent, SS_STRICT)<0) SS_ERROR(FAILED);
                 /* ISSUE: I don't think an MPI_Barrier() is sufficient to make the aggregators' independent H5Dwrite()
                  *        data to become available to the other tasks for reading. We may need either an MPI_File_sync()
                  *        or we may need to have the tasks write collectively. [rpm 2004-07-26] */
                 if (ss_mpi_barrier(blobcomm)<0) SS_ERROR(FAILED); /*wait for aggregators' independent H5Dwrite() calls*/
             }
             goto done;
         }
 #endif

         /* Write the data synchronously */
         if (flags & SS_ALLSAME) {
             /* Only one task needs to write the data. We use blob task zero by default. However, all tasks must synchronize
              * because the other tasks were expecting to not return until their data had been written. */
             if (0==blobtask) {
                 dxpl = SS_GFILE_LINK(&topscope)->dxpl_independent;
                 if (H5Dwrite(gblob->d[d_idx].dset, mtype, iom, iof, dxpl, buffer)<0) SS_ERROR(HDF5);
             }
             /* ISSUE: I don't think an MPI_Barrier() is sufficient here. We may need either an MPI_File_sync() or we may need
              *        to have the tasks write collectively but only one supplying data. [rpm 2004-07-26] */
             if (ss_mpi_barrier(blobcomm)<0) SS_ERROR(FAILED);
         } else {
             /* All tasks have data to write */
             dxpl = flags & SS_BLOB_COLLECTIVE ?
                    SS_GFILE_LINK(&topscope)->dxpl_collective :
                    SS_GFILE_LINK(&topscope)->dxpl_independent;
             if (H5Dwrite(gblob->d[d_idx].dset, mtype, iom, iof, dxpl, buffer)<0) SS_ERROR(HDF5);
         }
         if (flags & SS_BLOB_FREE) SS_FREE(buffer);
     }

 #ifdef HAVE_PARALLEL
  done:
 #endif

     /* Successful cleanup */
     if (iom>0 && H5Sclose(iom)<0) SS_ERROR(HDF5);
     iom = -1;
     if (iof>0 && H5Sclose(iof)<0) SS_ERROR(HDF5);
     iof = -1;
     if (mtype>0 && H5Tclose(mtype)<0) SS_ERROR(HDF5);
     mtype = -1;

  SS_CLEANUP:
     if (iom>0) H5Sclose(iom);
     if (iof>0) H5Sclose(iof);
     if (mtype>0) H5Tclose(mtype);
     SS_FREE(conv_buf);
     SS_LEAVE(0);
 }