1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281 | herr_t
ss_blob_synchronize(ss_scope_t UNUSED_SERIAL *topscope,
ss_prop_t UNUSED_SERIAL *props /* See [Blob Properties]. */
)
{
SS_ENTER(ss_blob_synchronize, herr_t);
#ifdef HAVE_PARALLEL
ss_gblob_t *gblob=NULL; /* Blob related file-level information */
size_t a_idx; /* Index into gblob->a table */
size_t a_start=0; /* Virtual start of the gblob->a[] so we don't have to shift elmts */
size_t a_start_orig=0; /* Original starting value of a_start */
size_t d_idx; /* Index into gblob->d[] for dataset on which we're operating */
size_t *ndsets=NULL; /* Array of dataset counts for all tasks */
size_t ndsets_total; /* Sum-reduction of ndsets[] */
size_t req_idx; /* Index through the I/O requests */
size_t nreq_total; /* Total I/O requests across all tasks */
MPI_Comm filecomm; /* File communicator */
int self, ntasks; /* Rank and size of file communicator */
int task; /* MPI task counter */
hsize_t *sel=NULL; /* Selection metadata (starts and counts per dimension per request) */
size_t sel_nalloc=0; /* Number of elements allocated for `sel' array */
size_t *sel_start=NULL; /* Exclusive sum scan for selection metadata counts */
size_t *sel_count=NULL; /* Number of selection metadata values by task */
size_t sel_idx; /* Index into `sel' array */
int ndims; /* Dataset dimensionality */
haddr_t smallest_objno; /* Object header address of current dataset */
hsize_t dset_size[H5S_MAX_RANK]; /* Current size of the dataset */
hsize_t bound_start[H5S_MAX_RANK]; /* Selection bounding box starting offsets */
hsize_t bound_end[H5S_MAX_RANK]; /* bound_start + bound_count */
hid_t one_dset=-1; /* Non-copied handle to the dataset being synchronized */
haddr_t one_dset_addr=0; /* Nonzero if only synchronizing one dataset */
H5G_stat_t stat; /* Status info for the one_dset */
int i, j;
/* WARNING! DO NOT CHANGE THIS STRUCT W/O CHANGING THE dset_info_mt MPI DATATYPE! */
struct dset_info_t {
haddr_t objno; /* HDF5 address for dataset object header */
size_t nreq; /* Number of I/O requests for this dataset */
} *dset_info=NULL; /* Information exchanged between MPI tasks */
MPI_Datatype dset_info_mt=MPI_DATATYPE_NULL; /* MPI datatype for struct dset_info_t elements */
gblob = SS_GFILE_LINK(topscope)->gblob;
if (ss_scope_comm(topscope, &filecomm, &self, &ntasks)<0) SS_ERROR(FAILED);
/* Are the callers synchronizing blobs for a particular dataset or all blobs in the file? */
if (!props || NULL==ss_prop_get(props, "dset", H5T_NATIVE_HID, &one_dset)) {
SS_STATUS_OK;
one_dset = -1;
one_dset_addr = 0;
} else {
if (H5Gget_objinfo(one_dset, ".", FALSE, &stat)<0) SS_ERROR(HDF5);
one_dset_addr = *((haddr_t*)stat.objno);
}
/* Sort async list by dataset address but keep ordering per dataset */
SS_ASSERT(!ss_blob_async_sort_g);
ss_blob_async_sort_g = gblob;
qsort(gblob->a, gblob->a_nused, sizeof(gblob->a[0]), ss_blob_async_sort_cb);
ss_blob_async_sort_g = NULL;
/* Combine adjacent requests */
/* ISSUE: This function makes no attempt to combine separate write requests from a single task into a single request. */
/* Allocate arrays that we'll need later */
if (NULL==(ndsets=malloc(ntasks*sizeof(*ndsets)))) SS_ERROR(RESOURCE);
if (NULL==(dset_info=malloc(ntasks*sizeof(*dset_info)))) SS_ERROR(RESOURCE);
if (NULL==(sel_start=malloc((ntasks+1)*sizeof(*sel_start)))) SS_ERROR(RESOURCE);
if (NULL==(sel_count=malloc(ntasks*sizeof(*sel_count)))) SS_ERROR(RESOURCE);
/* Create MPI datatype for dset_info elements */
{
static int counts[4] = {1, 1, 1, 1};
static MPI_Aint starts[4] = {0, offsetof(struct dset_info_t, objno),
offsetof(struct dset_info_t, nreq), sizeof(struct dset_info_t)};
static MPI_Datatype types[4];
if (!types[0]) {
types[0] = MPI_LB;
types[1] = MPI_HADDR_T;
types[2] = MPI_SIZE_T;
types[3] = MPI_UB;
}
if (MPI_Type_struct(4, counts, starts, types, &dset_info_mt)) SS_ERROR(MPI);
if (MPI_Type_commit(&dset_info_mt)) SS_ERROR(MPI);
}
/* How many unique datasets does each task have to process? */
for (a_idx=0, ndsets[self]=0; a_idx<gblob->a_nused; a_idx++) {
if (one_dset_addr>0) {
if (*((haddr_t*)(&gblob->d[gblob->a[a_idx].d_idx].stat.objno)[0])==one_dset_addr) {
a_start = a_idx;
ndsets[self] = 1;
break;
}
} else if (0==a_idx) {
ndsets[self]++;
} else if (!ss_eq_nos(gblob->d[gblob->a[a_idx].d_idx].stat.objno,gblob->d[gblob->a[a_idx-1].d_idx].stat.objno)) {
ndsets[self]++;
}
}
if (ss_mpi_allgather(ndsets, 1, MPI_SIZE_T, filecomm)<0) SS_ERROR(FAILED);
for (task=0, ndsets_total=0; task<ntasks; task++) ndsets_total += ndsets[task];
/* Process each dataset collectively. We could have done one big MPI_Allgather() of all the information we need for every
* dataset, but that could be an awful lot of dataset I/O records. So instead we locally sort each gblob->a[] and then
* repeatedly collectively choose the lowest-numbered dataset. */
a_start_orig = a_start;
while (ndsets_total) {
/* Choose the lowest numbered dataset. We can't exchange information about the elements in the data space selection
* until we know how many selections each task has. Pass zero for the `nreq' field if we don't have any more datasets. */
if (a_start<gblob->a_nused) {
dset_info[self].objno = gblob->d[gblob->a[a_start].d_idx].stat.objno;
for (a_idx=1; a_start+a_idx<gblob->a_nused; a_idx++) {
if (*((haddr_t*)(&gblob->d[gblob->a[a_start+a_idx].d_idx].stat.objno)[0])!=dset_info[self].objno) break;
}
dset_info[self].nreq = a_idx;
} else {
memset(dset_info+self, 0, sizeof(*dset_info));
}
if (ss_mpi_allgather(dset_info, 1, dset_info_mt, filecomm)<0) SS_ERROR(FAILED);
/* Choose the dataset with the lowest object header address or the one that matches one_dset_addr. */
if (one_dset_addr) {
smallest_objno = one_dset_addr;
} else {
for (task=0, smallest_objno=0; task<ntasks; task++) {
if (dset_info[task].nreq>0) {
if (smallest_objno==0) {
smallest_objno = dset_info[task].objno;
} else if (dset_info[task].objno < smallest_objno) {
smallest_objno = dset_info[task].objno;
}
}
}
SS_ASSERT(smallest_objno>0); /* someone must have a dataset since ndsets_total is nonzero */
}
/* Find information about the dataset by looking for the appropriate entry in the gblob->d array. That array should
* probably be sorted, but we can't easily sort it here because the gblob->a array contains indices into the gblob->d
* array. */
for (d_idx=0; d_idx<gblob->d_nused; d_idx++) if (*((haddr_t*)(&gblob->d[d_idx].stat.objno)[0])==smallest_objno) break;
SS_ASSERT(d_idx<gblob->d_nused);
if ((ndims=H5Sget_simple_extent_dims(gblob->d[d_idx].dspace, dset_size, NULL))<0) SS_ERROR(HDF5);
/* Clean up the exchanged dset_info() by pruning out all entries other than smallest_objno. At the same time compute
* an exclusive sum scan of the number of integers that must be exchanged in order to fully describe the data space
* selection of all of the I/O requests for this dataset (see below). */
for (task=0; task<=ntasks; task++) {
if (task<ntasks) {
if (smallest_objno!=dset_info[task].objno) memset(dset_info+task, 0, sizeof(*dset_info));
sel_count[task] = 2 * ndims * dset_info[task].nreq; /* start & count per dimension per request */
}
sel_start[task] = task ? sel_start[task-1] + sel_count[task-1] : 0;
}
/* Allocate the `sel' array. */
SS_ASSERT(0 == sel_start[ntasks] % (2*ndims));
nreq_total = sel_start[ntasks] / (2*ndims);
SS_EXTEND(sel, sel_start[ntasks], sel_nalloc);
/* Exchange information about the elements selected for the I/O operation for the smallest dataset. Each task will
* broadcast a list of starts (per dimension) and a list of counts (per dimension) repeated once for each I/O request.
* For each I/O request we group the starts together and the counts together (instead of starts and counts
* interleaved) because that's how the various HDF5 data space functions take the arguments. So we don't have to
* shuffle things around as much. */
for (a_idx=0; a_idx<dset_info[self].nreq; a_idx++) {
if (ss_blob_ckspace(gblob->a[a_start+a_idx].iof, ndims, NULL,
sel+sel_start[self]+a_idx*ndims*2+0/*starts*/,
sel+sel_start[self]+a_idx*ndims*2+ndims/*counts*/, NULL)<0) SS_ERROR(FAILED);
}
if (ss_mpi_allgatherv(sel, sel_count, sel_start, MPI_HSIZE_T, filecomm)) SS_ERROR(MPI);
/* Find the minimum bounding contiguous hyperslab for the I/O request and represent that with a multi-dimensional
* start and end stored in bound_start[] and bound_end[] */
for (i=0; i<ndims; i++) {
bound_start[i] = sel[i];
bound_end[i] = bound_start[i] + sel[ndims+i];
}
for (req_idx=1; req_idx<nreq_total; req_idx++) {
for (i=0; i<ndims; i++) {
bound_start[i] = MIN(bound_start[i], sel[req_idx*ndims*2+i]);
bound_end[i] = MAX(bound_end[i], sel[req_idx*ndims*2+i]+sel[req_idx*ndims*2+ndims+i]);
}
}
/* Extend dataset and reassign aggregators if appropriate */
for (i=0; i<ndims; i++) {
if (bound_end[i] > dset_size[i]) {
for (j=0; j<ndims; j++) dset_size[j] = MAX(dset_size[j], bound_end[j]);
if (H5Dextend(gblob->d[d_idx].dset, dset_size)<0) SS_ERROR(HDF5);
if (H5Sclose(gblob->d[d_idx].dspace)<0) SS_ERROR(HDF5);
if ((gblob->d[d_idx].dspace=H5Dget_space(gblob->d[d_idx].dset))<0) SS_ERROR(HDF5);
if (ss_blob_async_aggregators(filecomm, gblob, d_idx, ndims, dset_size)<0) SS_ERROR(FAILED);
break;
}
}
/* If no aggregators are assigned yet then do that now. This takes care of the case when this is the first time
* 2-phase I/O is performed on this dataset. */
if (!gblob->d[d_idx].agg.tasks &&
ss_blob_async_aggregators(filecomm, gblob, d_idx, ndims, dset_size)<0) SS_ERROR(FAILED);
/* ISSUE: The data shipping code uses MPI async p2p functions even when the source and destination are the same task. */
/* ISSUE: This function does not attempt to optimize the case when ss_blob_write() was called with the SS_ALLSAME bit
* flag. When this bit is set all blob tasks will have called ss_blob_write() with identical data and offsets and
* it may therefore be the case that an aggregation task has the data already available locally. */
/* If the current task is an aggregator then post all receives for data. Allocation of the aggregation buffer might
* cause us to block until I/O for some other dataset completes, hence *all* sends on other tasks to this task must be
* posted for all prior datasets for which this task is an aggregator. */
for (task=0; task<ntasks; task++) {
for (sel_idx=sel_start[task]; sel_idx<sel_start[task+1]; sel_idx+=ndims*2) {
hsize_t *req_dstart = sel + sel_idx; /* ultimate position of request in the dataset */
hsize_t *req_count = sel + sel_idx + ndims; /* size of the request */
ss_blob_stride_t agg_stride[H5S_MAX_RANK*2]; /* desc of intersection of request with aggregation buffer */
int nstrides = ss_blob_async_intersect(gblob, d_idx, self, dset_size, req_dstart, NULL, NULL,
req_count, agg_stride, NULL);
if (nstrides<0) SS_ERROR(FAILED);
if (ss_blob_async_receives(gblob, d_idx, task, filecomm, nstrides, agg_stride)<0) SS_ERROR(FAILED);
}
}
/* If the current task has data then post sends to the appropriate aggregators */
for (a_idx=0; a_idx<dset_info[self].nreq; a_idx++) {
hsize_t *req_dstart = sel + sel_start[self] + a_idx*ndims*2; /* ultimate position of request in the dataset */
hsize_t *req_count = sel + sel_start[self] + a_idx*ndims*2 + ndims; /* size of the request */
ss_blob_stride_t send_stride[H5S_MAX_RANK*2]; /* desc of send buffer / aggregation buffer intersection */
hsize_t req_dfirst_1 = ss_blob_array_linear(ndims, dset_size, req_dstart, NULL);
hsize_t req_dlast_1 = ss_blob_array_linear(ndims, dset_size, req_dstart, req_count);
hsize_t min_aggseq = req_dfirst_1 / gblob->d[d_idx].agg.elmts_per_agg;
hsize_t max_aggseq = req_dlast_1 / gblob->d[d_idx].agg.elmts_per_agg;
hsize_t mem_size[H5S_MAX_RANK], req_mstart[H5S_MAX_RANK];
hsize_t aggseq;
int aggtask, nstrides;
SS_ASSERT(max_aggseq<gblob->d[d_idx].agg.n);
if (ss_blob_ckspace(gblob->a[a_start+a_idx].iom, ndims, mem_size, req_mstart, NULL, NULL)<0) SS_ERROR(FAILED);
for (aggseq=min_aggseq; aggseq<=max_aggseq; aggseq++) {
aggtask = gblob->d[d_idx].agg.tasks[aggseq];
nstrides = ss_blob_async_intersect(gblob, d_idx, aggtask, dset_size, req_dstart, mem_size, req_mstart,
req_count, NULL, send_stride);
if (nstrides<0) SS_ERROR(FAILED);
if (ss_blob_async_sends(gblob, d_idx, aggtask, filecomm, nstrides, send_stride,
gblob->a[a_start+a_idx].buffer, gblob->a[a_start+a_idx].flags)<0)
SS_ERROR(FAILED);
gblob->a[a_start+a_idx].flags &= ~SS_BLOB_FREE; /* should only be marked for freeing once! */
}
}
/* Update dataset counters based on which tasks have requests for this dataset */
a_start += dset_info[self].nreq;
for (task=0; task<ntasks; task++) {
if (dset_info[task].objno) --ndsets_total;
}
}
/* Delete all the async requests that were just processed */
for (a_idx=a_start_orig; a_idx<a_start; a_idx++) {
H5Tclose(gblob->a[a_idx].mtype);
H5Sclose(gblob->a[a_idx].iom);
H5Sclose(gblob->a[a_idx].iof);
/* Do not free gblob->a[a_idx].buffer because we are sending that data to the aggregators. The caller is
* responsible for freeing that if desired once they know the data shipping has been completed. */
}
H5Eclear(); /*we don't care about errors in the above loop*/
memmove(gblob->a+a_start_orig, gblob->a+a_start, (gblob->a_nused-a_start)*sizeof(gblob->a[0]));
gblob->a_nused -= a_start - a_start_orig;
SS_FREE(ndsets);
SS_FREE(dset_info);
SS_FREE(sel_count);
SS_FREE(sel_start);
SS_FREE(sel);
if (MPI_Type_free(&dset_info_mt)) SS_ERROR(MPI);
dset_info_mt = MPI_DATATYPE_NULL;
SS_CLEANUP:
/* ISSUE: Lots of error-related stuff needs to go here! */
#endif /*HAVE_PARALLEL*/
SS_LEAVE(0);
}
|