1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
 herr_t
 ss_blob_mkstorage(ss_blob_t *blob,              /* The blob for which a dataset should be created. As a temporary work around
                                                  * for HDF5 limitations, tasks that are part of the blob's file communicator
                                                  * but not the blob's scope communicator should pass the top-level scope for
                                                  * the file that contains the blob. See the parallel notes for details. */
                   hsize_t *size,                /* OUT: Optional pointer which, upon successful return, will point to the
                                                  * cumulative number of elements from all lower-rank MPI tasks.  This can be
                                                  * useful in some cases for the caller to compute the starting offset of a
                                                  * task's contribution to the dataset when it is known that all tasks will
                                                  * output data contiguously in task rank order. The reason it's computed by
                                                  * this function is because this function must perform collective
                                                  * communication anyway in order to determine the dataset size. In SS_ALLSAME
                                                  * mode the return value will be zero because each tasks data starts at the
                                                  * beginning of the blob. Likewise, for SS_BLOB_EACH mode the return value is
                                                  * zero because each task has its own blob to describe where that tasks data
                                                  * starts in the dataset shared by all the tasks. */
                   unsigned flags,               /* Certain bit flags that affect the operation of this function. See the
                                                  * description for details. */
                   ss_prop_t *props              /* Optional properties (see Blob Properties). If a dataset creation property
                                                  * list and/or dataset name is supplied then they must be the same across all
                                                  * calling tasks. */
                   )
 {
     SS_ENTER(ss_blob_mkstorage, herr_t);
     ss_scope_t          topscope=SS_SCOPE_NULL; /* Top scope for BLOB */
     ss_scope_t          blobscope=SS_SCOPE_NULL;/* Scope in which the blob lives if this is a blob task */
     ss_gblob_t          *gblob=NULL;            /* The gblob table from the top scope's gfile entry */
     hid_t               dcpl=H5P_DEFAULT;       /* Dataset creation property list */
     hid_t               dcpl_close=-1;          /* Dataset creation property list to close during cleanup */
     long                my_nelmts_l=0;          /* Should be hssize_t but long is the biggest MPI type supported by reduce */
     long                all_nelmts_l=0;         /* Sum of all my_nelmts across all tasks in the file communicator. */
     hsize_t             my_nelmts;              /* Number of dataset elements to be owned by the BLOB */
     hsize_t             all_nelmts;             /* Same as all_nelmts except type is hsize_t for passing ptr to HDF5 */
     hsize_t             dset_size;              /* Number of elements in the created dataset */
     hsize_t             dset_maxsize;           /* The maximum size in elements of the created dataset */
     hid_t               dset=-1, fspace=-1;     /* Dataset and space to be cleaned up on return */
     hid_t               fid=-1;                 /* File handle, not to be closed on return */
     MPI_Comm            filecomm;               /* File communicator */
     int                 root;                   /* Root task w.r.t. file communicator for broadcast */
     int                 blob_self=-1;           /* Rank of calling task in the blob communicator, or negative */
     int                 blob_ntasks=0;          /* Number of tasks in the blob communicator, or zero */
     int                 file_ntasks=0;          /* Number of tasks in the file communicator. */
     size_t              elmt_size;              /* Size of a single element of the dataset in the file */
     hsize_t             chunk_size=64*1024;     /* Size of chunk: 64 kB adjusted later to elements instead of bytes */
     const char          *dsetname=NULL;         /* Optional additional dataset name */
     ss_file_t           blobfile=SS_FILE_NULL;  /* The file in which BLOB appears */
     hsize_t             offset;                 /* Blob's offset into the dataset */

     if (!ss_mpi_extras((ss_pers_t**)&blob, &topscope)) SS_ERROR(FAILED);
     gblob = SS_GFILE_LINK(&topscope)->gblob;
     if (blob) SS_ASSERT_MEM(blob, ss_blob_t);

     SS_ASSERT(((flags & SS_ALLSAME)?1:0) + ((flags & SS_BLOB_EACH)?1:0) + ((flags & SS_BLOB_RANK)?1:0) <= 1);

     /* Blob and file communicator info */
     if (blob) {
         if (NULL==ss_pers_scope((ss_pers_t*)blob, &blobscope)) SS_ERROR(FAILED);
         if (ss_scope_comm(&blobscope, NULL, &blob_self, &blob_ntasks)<0) SS_ERROR(FAILED);
     }
     if (ss_scope_comm(&topscope, &filecomm, NULL, &file_ntasks)<0) SS_ERROR(FAILED);

     /* Dataset creation properties -- same for *all* tasks */
     if (!props || NULL==ss_prop_get(props, "dcpl", H5T_NATIVE_HID, &dcpl)) {
         SS_STATUS_OK;
         if (flags & SS_BLOB_EXTEND) {
             if ((dcpl=dcpl_close=H5Pcreate(H5P_DATASET_CREATE))<0) SS_ERROR(HDF5);
             if (0==(elmt_size=H5Tget_size(SS_BLOB(blob)->m.mtype))) SS_ERROR(HDF5);
             chunk_size = MAX(1, (chunk_size+elmt_size/2)/elmt_size);
             if (H5Pset_chunk(dcpl, 1, &chunk_size)<0) SS_ERROR(HDF5);
         }
     } else if (H5D_CHUNKED!=H5Pget_layout(dcpl)) {
         SS_ERROR_FMT(USAGE, ("SS_BLOB_EXTEND flag set but supplied dataset creation property list is not chunked"));
     }

     /* Optional dataset name */
     if (!props || NULL==ss_prop_get(props, "name", H5T_NATIVE_VOIDP, &dsetname)) {
         SS_STATUS_OK;
         dsetname = NULL;
     }

     /* Determine how much total data we have. We only have to sum this across the scope communicator but we need the result
      * across the file communicator. If the the SS_ALLSAME or SS_BLOB_RANK flag is set then the size will be calculated and
      * broadcast from blob task zero. If blob_ntasks==file_ntasks then all calling tasks must necessarily also be blob
      * tasks and we may be able to avoid some communication. */
     if (blob && SS_BLOB(blob)->m.mem)
         if ((my_nelmts_l = H5Sget_select_npoints(SS_BLOB(blob)->m.mspace))<0) SS_ERROR(HDF5);
     if (flags & SS_ALLSAME) {
         /* All blob tasks are bound to the same size memory, but only one will be used as the total dataset size. */
         if (blob_ntasks!=file_ntasks) {
             if ((root=ss_mpi_elect(blob?(ss_pers_t*)blob:(ss_pers_t*)&topscope))<0) SS_ERROR(FAILED);
             ss_mpi_bcast(&my_nelmts_l, (size_t)1, MPI_UNSIGNED_LONG, root, filecomm);
         }
         all_nelmts_l = my_nelmts_l;
     } else if (flags & SS_BLOB_RANK) {
         /* All blob tasks are bound to the same size memory; total size is the sum across all blob tasks */
         all_nelmts_l = my_nelmts_l * blob_ntasks;
         if (blob_ntasks!=file_ntasks) {
             if ((root=ss_mpi_elect(blob?(ss_pers_t*)blob:(ss_pers_t*)&topscope))<0) SS_ERROR(FAILED);
             ss_mpi_bcast(&all_nelmts_l, (size_t)1, MPI_UNSIGNED_LONG, root, filecomm);
         }
     } else {
         /* Every task may be bound to a different memory size; total size is the sum across all blob tasks */
 #ifdef HAVE_PARALLEL
         if (MPI_Allreduce(&my_nelmts_l, &all_nelmts_l, 1, MPI_UNSIGNED_LONG, MPI_SUM, filecomm)) SS_ERROR(MPI);
 #else
         all_nelmts_l = my_nelmts_l;
 #endif
     }
     my_nelmts = my_nelmts_l;            /* How many elements belong to this task's blob? Zero on non-blob tasks. */
     all_nelmts = all_nelmts_l;          /* How big is the whole dataset? */

     /* If the user is asking for a scanned SIZE then return that now since we're doing communication anyway. We could have
      * done an allgather of the local size and done the reduction and scan in this function, but separate MPI_Allreduce() and
      * MPI_Scan() seem more readable and use less memory.. */
     if (flags & SS_ALLSAME) {
         offset = 0;
         if (size) *size = offset;
     } else if (flags & SS_BLOB_RANK) {
         offset = blob_self * my_nelmts_l;
         if (size) *size = offset;
     } else {
 #ifdef HAVE_PARALLEL
         if (MPI_Scan(&my_nelmts_l, &all_nelmts_l, 1, MPI_UNSIGNED_LONG, MPI_SUM, filecomm)) SS_ERROR(MPI);
         offset = all_nelmts_l - my_nelmts_l; /* exclusive scan */
 #else
         offset = 0;
 #endif
         if (size) {
             *size = (flags & SS_BLOB_EACH) ? 0 : offset;
         }
     }

     /* It doesn't make much sense to create a zero-size dataset, but there might be some cases where all tasks just happened
      * to have no data. We have two choices with HDF5: create a zero size extendible dataset or create a very small contiguous
      * dataset. We will use a 1d, contiguous dataset of size one when possible because that occupies the least file space and the
      * dimensionality matches what ss_blob_normalize() returns for a null space. */
     if (flags & SS_BLOB_EXTEND) {
         dset_size = all_nelmts;
         dset_maxsize = H5S_UNLIMITED;
     } else {
         dset_size = MAX(1, all_nelmts);
         dset_maxsize = dset_size;
     }
     if ((fspace=H5Screate_simple(1, &dset_size, &dset_maxsize))<0) SS_ERROR(HDF5);

     /* Create the new dataset using either the name supplied in the property list or a temporary name */
     if (dsetname) {
         if (NULL==ss_pers_file((ss_pers_t*)blob, &blobfile)) SS_ERROR(FAILED);
         if ((fid=ss_file_isopen(&blobfile, NULL))<0) SS_ERROR(FAILED);
         if ((dset=H5Dcreate(fid, dsetname, SS_BLOB(blob)->m.mtype, fspace, dcpl))<0) SS_ERROR(HDF5);
     } else {
         if ((dset=H5Dcreate(gblob->storage, "TEMP", SS_BLOB(blob)->m.mtype, fspace, dcpl))<0) SS_ERROR(HDF5);
         if (H5Gunlink(gblob->storage, "TEMP")<0) SS_ERROR(HDF5);
     }

     /* Bind the dataset to the blob(s) */
     if ((flags & SS_ALLSAME) || (flags & SS_BLOB_EACH)) {
         if (H5Sselect_slab(fspace, H5S_SELECT_SET, (hsize_t)0, &offset, &my_nelmts)<0) SS_ERROR(HDF5);
     } else {
         if (H5Sselect_slab(fspace, H5S_SELECT_SET, (hsize_t)0, NULL, &all_nelmts)<0) SS_ERROR(HDF5);
     }
     if (ss_blob_bind_f(blob, dset, fspace, (flags & SS_BLOB_EXTEND))<0) SS_ERROR(FAILED);

     /* Successful cleanup */
     if (dcpl_close>0) H5Pclose(dcpl_close);
     dcpl_close = -1;

  SS_CLEANUP:
     if (dset>0) H5Dclose(dset);
     if (fspace>0) H5Sclose(fspace);
     if (dcpl_close>0) H5Pclose(dcpl_close);

     SS_LEAVE(0);
 }