1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
 herr_t
 ss_blob_bind_f(ss_blob_t *blob,                 /* The blob to which a dataset is associated. */
                hid_t dset,                      /* The dataset to associate with the blob. The handle is duplicated by SSlib,
                                                  * allowing the caller to close its handle at any time. */
                hid_t dspace,                    /* A selection that describes which elements of DSET are owned by the blob. If
                                                  * non-positive then the entire dataset is owned by the blob. */
                unsigned flags                   /* Various bit flags affecting the operation of this function. */
                )
 {
     SS_ENTER(ss_blob_bind_f, herr_t);
     ss_scope_t          topscope=SS_SCOPE_NULL; /* The top scope for the blob (obtained from blob or passed as BLOB argument) */
     ss_gblob_t          *gblob=NULL;            /* The gfile->gblob pointer for convenience */
     size_t              d_idx=SS_NOSIZE;        /* Index into gblob->d table */
     H5G_stat_t          sb;                     /* Stat buffer for the supplied dataset handle */
     H5G_stat_t          sb_link;                /* Stat buffer for dataset in blob storage directory */
     hid_t               dset_duped=-1;          /* Dataset handle duplicated by this function to be closed on error*/
     hid_t               dtype_duped=-1;         /* Dataset type opened in this function to be closed on error */
     hid_t               dspace_duped=-1;        /* Dataset space opened in this function to be closed on error */
     char                dsetname[64];           /* Dataset name as it appears in blob storage group */
     hbool_t             new_entry=FALSE;        /* Are we appending a new entry to the gblob table? */
     hid_t               dcpl=-1;                /* Dataset creation property list */
     int                 ndims=-1;               /* Dataset dimensionality */
     hsize_t             dsize[H5S_MAX_RANK];    /* Current size of DSET */
     hsize_t             newsize[H5S_MAX_RANK];  /* New required size of the dataset */
     int                 root;                   /* Rank of blob task zero in the file communicator */
     int                 self;                   /* Rank of the calling task in the file communicator */
     MPI_Comm            filecomm=SS_COMM_NULL;  /* The file communicator */
     htri_t              extendible=-1;          /* Is the dataset extendible? */
     int                 i;

     /* Communicator related stuff */
     if (!ss_mpi_extras((ss_pers_t**)&blob, &topscope)) SS_ERROR(FAILED);
     if (blob) SS_ASSERT_MEM(blob, ss_blob_t);
     else dspace = -1;
     if (ss_scope_comm(&topscope, &filecomm, &self, NULL)<0) SS_ERROR(FAILED);

     if (dset<=0) SS_ERROR_FMT(USAGE, ("no dataset handle"));
     if (H5Gget_objinfo(dset, ".", FALSE, &sb)<0) SS_ERROR(FAILED);
     gblob = SS_GFILE_LINK(&topscope)->gblob;

     /* Mark the blob as dirty here near the beginning of the function so we don't have to worry about it anymore. This makes
      * it easy if an error occurs half way through the function. No need to mark as unsynchronized since a precondition is
      * that we're being called collectively with all tasks having compatible arguments. */
     if (blob) SS_BLOB(blob)->m.pers.dirty = TRUE;

     /* Unbind any old dataset from the blob. We do this fairly early because we'd like to have it unbound if there was an
      * error in binding. Do not unbind memory. */
     if (blob && SS_BLOB(blob)->dsetaddr) {
         if (SS_NOSIZE==(d_idx=ss_blob_didx(blob))) SS_ERROR(FAILED);
         SS_ASSERT(ss_eq_nos(gblob->d[d_idx].stat.objno,sb.objno));
         SS_BLOB(blob)->dsetaddr = 0;
         memset(SS_BLOB(blob)->start, 0, sizeof(SS_BLOB(blob)->start));
         memset(SS_BLOB(blob)->count, 0, sizeof(SS_BLOB(blob)->count));
     }

     /* Make sure that the data space is suitable. Save the selection starts and counts in the blob object. */
     memset(SS_BLOB(blob)->start, 0, sizeof(SS_BLOB(blob)->start));
     memset(SS_BLOB(blob)->count, 0, sizeof(SS_BLOB(blob)->count));
     if (blob && (ndims=ss_blob_ckspace(dspace, SS_MAXDIMS, NULL, SS_BLOB(blob)->start, SS_BLOB(blob)->count, NULL))<0)
         SS_ERROR(FAILED);

     /* Is the dataset extendible? */
     if ((dcpl=H5Dget_create_plist(dset))<0) SS_ERROR(HDF5);
     extendible = H5D_CHUNKED == H5Pget_layout(dcpl);
     if (H5Pclose(dcpl)<0) SS_ERROR(HDF5);
     dcpl = -1;

     /* If the SS_BLOB_EXTENDIBLE flag is set then make sure the dataset is extendible */
     if ((flags & SS_BLOB_EXTEND) && !extendible)
         SS_ERROR_FMT(HDF5, ("SS_BLOB_EXTENDIBLE set but dataset is not extendible"));

     /* Every task allocates a record to the gblob table for the file, or updates a record if the dataset is already there
      * because some other blob is (or was) bound to it. */
     for (d_idx=0; d_idx<gblob->d_nused; d_idx++) {
         if (ss_eq_nos(gblob->d[d_idx].stat.objno,sb.objno) && ss_eq_nos(gblob->d[d_idx].stat.fileno,sb.fileno)) {
             break;
         }
     }
     if (d_idx >= gblob->d_nused) {
         SS_EXTEND(gblob->d, MAX(64,gblob->d_nused+1), gblob->d_nalloc);
         d_idx = gblob->d_nused; /*increment on success*/
         memset(gblob->d + d_idx, 0, sizeof(gblob->d[0]));
         new_entry = TRUE;
     }

     /* If the dataset was not already in the gblob table then make sure the dataset has a name in the blob storage group and
      * duplicate the dataset handle. Also obtain the datatype (see ss_blob_boot_cb() for reason). */
     if (gblob->d[d_idx].dset <= 0) {
         gblob->d[d_idx].stat = sb;
         SS_ASSERT(gblob->storage>0);
         /*sprintf(dsetname, "%lu", (unsigned long)(sb.objno));*/
         sprintf(dsetname, "%lu", (unsigned long)sb.objno[0]);
         if (H5Gget_objinfo(gblob->storage, dsetname, FALSE, &sb_link)>=0) {
             if (!ss_eq_nos(sb.objno,sb_link.objno)) SS_ERROR_FMT(CORRUPT, ("mis-linked blob storage for `%s'", dsetname));
         } else {
             if (H5Glink2(dset, ".", H5G_LINK_HARD, gblob->storage, dsetname)<0) SS_ERROR(HDF5);
         }
         if ((gblob->d[d_idx].dset = dset_duped = H5Dopen(dset, "."))<0) SS_ERROR(HDF5);
         if ((gblob->d[d_idx].dtype = dtype_duped = H5Dget_type(dset_duped))<0) SS_ERROR(HDF5);
         if ((gblob->d[d_idx].dspace = dspace_duped = H5Dget_space(dset_duped))<0) SS_ERROR(HDF5);
         gblob->d[d_idx].is_extendible = extendible;
     }

     /* If the SS_BLOB_EXTEND bit is set then extend the dataset if necessary so that the blob fits inside it. We can only make
      * the determination on the blob tasks but all file tasks need to know the result in order to call H5Dextend(). To further
      * complicate matters, the non-blob tasks don't know which task is the blob's task zero. */
     if (flags & SS_BLOB_EXTEND) {
         if ((ndims=ss_blob_ckspace(gblob->d[d_idx].dspace, SS_MAXDIMS, dsize, NULL, NULL, NULL))<0) SS_ERROR(FAILED);
         if ((root=ss_mpi_elect(blob?(ss_pers_t*)blob:(ss_pers_t*)&topscope))<0) SS_ERROR(FAILED);
         if (blob && root==self) {
             for (i=0; i<ndims; i++) {
                 newsize[i] = MAX(dsize[i], SS_BLOB(blob)->start[i]+SS_BLOB(blob)->count[i]);
             }
         }
         if (ss_mpi_bcast(newsize, (size_t)ndims, MPI_HSIZE_T, root, filecomm)<0) SS_ERROR(FAILED);
         for (i=0; i<ndims; i++) {
             if (newsize[i]>dsize[i]) {
                 if (H5Dextend(gblob->d[d_idx].dset, newsize)<0) SS_ERROR(HDF5);
                 if (H5Sclose(gblob->d[d_idx].dspace)<0) SS_ERROR(HDF5);
                 if ((gblob->d[d_idx].dspace=H5Dget_space(gblob->d[d_idx].dset))<0) SS_ERROR(HDF5);
                 if (dspace_duped>0) dspace_duped = gblob->d[d_idx].dspace;
                 if (ss_blob_async_aggregators(filecomm, gblob, d_idx, ndims, newsize)<0) SS_ERROR(FAILED);
                 break;
             }
         }
     }


     /* Final adjustments to the blob */
     if (blob) {
         SS_ASSERT(ss_eq_nos(sb.objno,gblob->d[d_idx].stat.objno));
         SS_BLOB(blob)->dsetaddr = *((haddr_t*)sb.objno);
         SS_BLOB(blob)->m.d_idx = d_idx;
     }

     /* Adjust counters just before successful return */
     if (new_entry) gblob->d_nused++;

  SS_CLEANUP:
     if (dset_duped>0) H5Dclose(dset_duped);
     if (dtype_duped>0) H5Tclose(dtype_duped);
     if (dspace_duped>0) H5Sclose(dspace_duped);
     if (dcpl>0) H5Pclose(dcpl);
     SS_LEAVE(0);
 }