1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174 | herr_t
ss_blob_mkstorage(ss_blob_t *blob, /* The blob for which a dataset should be created. As a temporary work around
* for HDF5 limitations, tasks that are part of the blob's file communicator
* but not the blob's scope communicator should pass the top-level scope for
* the file that contains the blob. See the parallel notes for details. */
hsize_t *size, /* OUT: Optional pointer which, upon successful return, will point to the
* cumulative number of elements from all lower-rank MPI tasks. This can be
* useful in some cases for the caller to compute the starting offset of a
* task's contribution to the dataset when it is known that all tasks will
* output data contiguously in task rank order. The reason it's computed by
* this function is because this function must perform collective
* communication anyway in order to determine the dataset size. In SS_ALLSAME
* mode the return value will be zero because each tasks data starts at the
* beginning of the blob. Likewise, for SS_BLOB_EACH mode the return value is
* zero because each task has its own blob to describe where that tasks data
* starts in the dataset shared by all the tasks. */
unsigned flags, /* Certain bit flags that affect the operation of this function. See the
* description for details. */
ss_prop_t *props /* Optional properties (see Blob Properties). If a dataset creation property
* list and/or dataset name is supplied then they must be the same across all
* calling tasks. */
)
{
SS_ENTER(ss_blob_mkstorage, herr_t);
ss_scope_t topscope=SS_SCOPE_NULL; /* Top scope for BLOB */
ss_scope_t blobscope=SS_SCOPE_NULL;/* Scope in which the blob lives if this is a blob task */
ss_gblob_t *gblob=NULL; /* The gblob table from the top scope's gfile entry */
hid_t dcpl=H5P_DEFAULT; /* Dataset creation property list */
hid_t dcpl_close=-1; /* Dataset creation property list to close during cleanup */
long my_nelmts_l=0; /* Should be hssize_t but long is the biggest MPI type supported by reduce */
long all_nelmts_l=0; /* Sum of all my_nelmts across all tasks in the file communicator. */
hsize_t my_nelmts; /* Number of dataset elements to be owned by the BLOB */
hsize_t all_nelmts; /* Same as all_nelmts except type is hsize_t for passing ptr to HDF5 */
hsize_t dset_size; /* Number of elements in the created dataset */
hsize_t dset_maxsize; /* The maximum size in elements of the created dataset */
hid_t dset=-1, fspace=-1; /* Dataset and space to be cleaned up on return */
hid_t fid=-1; /* File handle, not to be closed on return */
MPI_Comm filecomm; /* File communicator */
int root; /* Root task w.r.t. file communicator for broadcast */
int blob_self=-1; /* Rank of calling task in the blob communicator, or negative */
int blob_ntasks=0; /* Number of tasks in the blob communicator, or zero */
int file_ntasks=0; /* Number of tasks in the file communicator. */
size_t elmt_size; /* Size of a single element of the dataset in the file */
hsize_t chunk_size=64*1024; /* Size of chunk: 64 kB adjusted later to elements instead of bytes */
const char *dsetname=NULL; /* Optional additional dataset name */
ss_file_t blobfile=SS_FILE_NULL; /* The file in which BLOB appears */
hsize_t offset; /* Blob's offset into the dataset */
if (!ss_mpi_extras((ss_pers_t**)&blob, &topscope)) SS_ERROR(FAILED);
gblob = SS_GFILE_LINK(&topscope)->gblob;
if (blob) SS_ASSERT_MEM(blob, ss_blob_t);
SS_ASSERT(((flags & SS_ALLSAME)?1:0) + ((flags & SS_BLOB_EACH)?1:0) + ((flags & SS_BLOB_RANK)?1:0) <= 1);
/* Blob and file communicator info */
if (blob) {
if (NULL==ss_pers_scope((ss_pers_t*)blob, &blobscope)) SS_ERROR(FAILED);
if (ss_scope_comm(&blobscope, NULL, &blob_self, &blob_ntasks)<0) SS_ERROR(FAILED);
}
if (ss_scope_comm(&topscope, &filecomm, NULL, &file_ntasks)<0) SS_ERROR(FAILED);
/* Dataset creation properties -- same for *all* tasks */
if (!props || NULL==ss_prop_get(props, "dcpl", H5T_NATIVE_HID, &dcpl)) {
SS_STATUS_OK;
if (flags & SS_BLOB_EXTEND) {
if ((dcpl=dcpl_close=H5Pcreate(H5P_DATASET_CREATE))<0) SS_ERROR(HDF5);
if (0==(elmt_size=H5Tget_size(SS_BLOB(blob)->m.mtype))) SS_ERROR(HDF5);
chunk_size = MAX(1, (chunk_size+elmt_size/2)/elmt_size);
if (H5Pset_chunk(dcpl, 1, &chunk_size)<0) SS_ERROR(HDF5);
}
} else if (H5D_CHUNKED!=H5Pget_layout(dcpl)) {
SS_ERROR_FMT(USAGE, ("SS_BLOB_EXTEND flag set but supplied dataset creation property list is not chunked"));
}
/* Optional dataset name */
if (!props || NULL==ss_prop_get(props, "name", H5T_NATIVE_VOIDP, &dsetname)) {
SS_STATUS_OK;
dsetname = NULL;
}
/* Determine how much total data we have. We only have to sum this across the scope communicator but we need the result
* across the file communicator. If the the SS_ALLSAME or SS_BLOB_RANK flag is set then the size will be calculated and
* broadcast from blob task zero. If blob_ntasks==file_ntasks then all calling tasks must necessarily also be blob
* tasks and we may be able to avoid some communication. */
if (blob && SS_BLOB(blob)->m.mem)
if ((my_nelmts_l = H5Sget_select_npoints(SS_BLOB(blob)->m.mspace))<0) SS_ERROR(HDF5);
if (flags & SS_ALLSAME) {
/* All blob tasks are bound to the same size memory, but only one will be used as the total dataset size. */
if (blob_ntasks!=file_ntasks) {
if ((root=ss_mpi_elect(blob?(ss_pers_t*)blob:(ss_pers_t*)&topscope))<0) SS_ERROR(FAILED);
ss_mpi_bcast(&my_nelmts_l, (size_t)1, MPI_UNSIGNED_LONG, root, filecomm);
}
all_nelmts_l = my_nelmts_l;
} else if (flags & SS_BLOB_RANK) {
/* All blob tasks are bound to the same size memory; total size is the sum across all blob tasks */
all_nelmts_l = my_nelmts_l * blob_ntasks;
if (blob_ntasks!=file_ntasks) {
if ((root=ss_mpi_elect(blob?(ss_pers_t*)blob:(ss_pers_t*)&topscope))<0) SS_ERROR(FAILED);
ss_mpi_bcast(&all_nelmts_l, (size_t)1, MPI_UNSIGNED_LONG, root, filecomm);
}
} else {
/* Every task may be bound to a different memory size; total size is the sum across all blob tasks */
#ifdef HAVE_PARALLEL
if (MPI_Allreduce(&my_nelmts_l, &all_nelmts_l, 1, MPI_UNSIGNED_LONG, MPI_SUM, filecomm)) SS_ERROR(MPI);
#else
all_nelmts_l = my_nelmts_l;
#endif
}
my_nelmts = my_nelmts_l; /* How many elements belong to this task's blob? Zero on non-blob tasks. */
all_nelmts = all_nelmts_l; /* How big is the whole dataset? */
/* If the user is asking for a scanned SIZE then return that now since we're doing communication anyway. We could have
* done an allgather of the local size and done the reduction and scan in this function, but separate MPI_Allreduce() and
* MPI_Scan() seem more readable and use less memory.. */
if (flags & SS_ALLSAME) {
offset = 0;
if (size) *size = offset;
} else if (flags & SS_BLOB_RANK) {
offset = blob_self * my_nelmts_l;
if (size) *size = offset;
} else {
#ifdef HAVE_PARALLEL
if (MPI_Scan(&my_nelmts_l, &all_nelmts_l, 1, MPI_UNSIGNED_LONG, MPI_SUM, filecomm)) SS_ERROR(MPI);
offset = all_nelmts_l - my_nelmts_l; /* exclusive scan */
#else
offset = 0;
#endif
if (size) {
*size = (flags & SS_BLOB_EACH) ? 0 : offset;
}
}
/* It doesn't make much sense to create a zero-size dataset, but there might be some cases where all tasks just happened
* to have no data. We have two choices with HDF5: create a zero size extendible dataset or create a very small contiguous
* dataset. We will use a 1d, contiguous dataset of size one when possible because that occupies the least file space and the
* dimensionality matches what ss_blob_normalize() returns for a null space. */
if (flags & SS_BLOB_EXTEND) {
dset_size = all_nelmts;
dset_maxsize = H5S_UNLIMITED;
} else {
dset_size = MAX(1, all_nelmts);
dset_maxsize = dset_size;
}
if ((fspace=H5Screate_simple(1, &dset_size, &dset_maxsize))<0) SS_ERROR(HDF5);
/* Create the new dataset using either the name supplied in the property list or a temporary name */
if (dsetname) {
if (NULL==ss_pers_file((ss_pers_t*)blob, &blobfile)) SS_ERROR(FAILED);
if ((fid=ss_file_isopen(&blobfile, NULL))<0) SS_ERROR(FAILED);
if ((dset=H5Dcreate(fid, dsetname, SS_BLOB(blob)->m.mtype, fspace, dcpl))<0) SS_ERROR(HDF5);
} else {
if ((dset=H5Dcreate(gblob->storage, "TEMP", SS_BLOB(blob)->m.mtype, fspace, dcpl))<0) SS_ERROR(HDF5);
if (H5Gunlink(gblob->storage, "TEMP")<0) SS_ERROR(HDF5);
}
/* Bind the dataset to the blob(s) */
if ((flags & SS_ALLSAME) || (flags & SS_BLOB_EACH)) {
if (H5Sselect_slab(fspace, H5S_SELECT_SET, (hsize_t)0, &offset, &my_nelmts)<0) SS_ERROR(HDF5);
} else {
if (H5Sselect_slab(fspace, H5S_SELECT_SET, (hsize_t)0, NULL, &all_nelmts)<0) SS_ERROR(HDF5);
}
if (ss_blob_bind_f(blob, dset, fspace, (flags & SS_BLOB_EXTEND))<0) SS_ERROR(FAILED);
/* Successful cleanup */
if (dcpl_close>0) H5Pclose(dcpl_close);
dcpl_close = -1;
SS_CLEANUP:
if (dset>0) H5Dclose(dset);
if (fspace>0) H5Sclose(fspace);
if (dcpl_close>0) H5Pclose(dcpl_close);
SS_LEAVE(0);
}
|