Module pyminflux.writer
Writer of processed MINFLUX data.
Classes
class MinFluxWriter-
Expand source code
class MinFluxWriter: __docs__ = "Writer of (filtered) MINFLUX data to either `.npy` or `.csv` formats." @staticmethod def write_npy(processor: MinFluxProcessor, file_name: Union[Path, str]) -> bool: """Write (filtered) data as a NumPy structured array with the same structure as in the Imspector `.npy` file.""" # Get the raw data filtered_array = processor.filtered_raw_data_array if filtered_array is None: return False # Save the filtered structured NumPy array to disk try: np.save(str(file_name), filtered_array) except Exception as e: print(f"Could not save {file_name}: {e}") return False return True @staticmethod def write_csv(processor: MinFluxProcessor, file_name: Union[Path, str]) -> bool: """Write (filtered) data as a comma-separated-value `.csv` file.""" # Save the filtered dataframe to disk try: # Prepare header comment with fluorophore names header_lines = [] fluorophore_names = processor.fluorophore_names if fluorophore_names: # Only include header comments when names are custom (not default "<id>") has_custom_names = any( name != str(fluo_id) for fluo_id, name in fluorophore_names.items() ) if has_custom_names: header_lines.append("# Fluorophore Names:") for fluo_id in sorted(fluorophore_names.keys()): name = fluorophore_names[fluo_id] header_lines.append(f"# fluo_{fluo_id}={name}") tid_offsets = getattr(processor.dataset, "tid_offsets", []) if tid_offsets: header_lines.append("# TID Offsets (first_iid -> tid_offset):") for first_iid, tid_offset in sorted(tid_offsets, key=lambda x: x[0]): header_lines.append(f"# iid>={first_iid} : +{tid_offset}") # Write header and data with open(file_name, 'w') as f: # Write header comments if header_lines: f.write('\n'.join(header_lines) + '\n') # Write the dataframe processor.filtered_dataframe.to_csv(f, index=False) except Exception as e: print(f"Could not save {file_name}: {e}") return False return TrueStatic methods
def write_csv(processor: pyminflux.processor.MinFluxProcessor,
file_name: pathlib.Path | str) ‑> bool-
Expand source code
@staticmethod def write_csv(processor: MinFluxProcessor, file_name: Union[Path, str]) -> bool: """Write (filtered) data as a comma-separated-value `.csv` file.""" # Save the filtered dataframe to disk try: # Prepare header comment with fluorophore names header_lines = [] fluorophore_names = processor.fluorophore_names if fluorophore_names: # Only include header comments when names are custom (not default "<id>") has_custom_names = any( name != str(fluo_id) for fluo_id, name in fluorophore_names.items() ) if has_custom_names: header_lines.append("# Fluorophore Names:") for fluo_id in sorted(fluorophore_names.keys()): name = fluorophore_names[fluo_id] header_lines.append(f"# fluo_{fluo_id}={name}") tid_offsets = getattr(processor.dataset, "tid_offsets", []) if tid_offsets: header_lines.append("# TID Offsets (first_iid -> tid_offset):") for first_iid, tid_offset in sorted(tid_offsets, key=lambda x: x[0]): header_lines.append(f"# iid>={first_iid} : +{tid_offset}") # Write header and data with open(file_name, 'w') as f: # Write header comments if header_lines: f.write('\n'.join(header_lines) + '\n') # Write the dataframe processor.filtered_dataframe.to_csv(f, index=False) except Exception as e: print(f"Could not save {file_name}: {e}") return False return TrueWrite (filtered) data as a comma-separated-value
.csvfile. def write_npy(processor: pyminflux.processor.MinFluxProcessor,
file_name: pathlib.Path | str) ‑> bool-
Expand source code
@staticmethod def write_npy(processor: MinFluxProcessor, file_name: Union[Path, str]) -> bool: """Write (filtered) data as a NumPy structured array with the same structure as in the Imspector `.npy` file.""" # Get the raw data filtered_array = processor.filtered_raw_data_array if filtered_array is None: return False # Save the filtered structured NumPy array to disk try: np.save(str(file_name), filtered_array) except Exception as e: print(f"Could not save {file_name}: {e}") return False return TrueWrite (filtered) data as a NumPy structured array with the same structure as in the Imspector
.npyfile.
class PMXWriter (processor: pyminflux.processor.MinFluxProcessor)-
Expand source code
class PMXWriter: """Writer of (processed) MINFLUX into native `.pmx` format.""" def __init__(self, processor: MinFluxProcessor): self.processor = processor self.state = State() self._message = "" # HDF5 has no native support for boolean datatypes; therefore # we convert back and forth between bool and np.int8 self.boolean_columns = ["vld", "fnl", "bot", "eot"] def write(self, file_name: Union[Path, str]) -> bool: try: # Create HDF5 file with the structure with h5py.File(file_name, "w") as f: # Set file version attribute f.attrs["file_version"] = "3.0" # Set MinFluxReader version f.attrs["reader_version"] = self.processor.reader.version # Create groups raw_data_group = f.create_group("raw") parameters_group = f.create_group("parameters") paraview_group = f.create_group("paraview") # If the reader is version 1, we store filtered_raw_data_array # as "raw/npy" for backwards compatibility if self.processor.reader.version == 1: raw_data_group.create_dataset( "npy", data=self.processor.filtered_raw_data_array, compression="gzip", ) elif self.processor.reader.version == 2: self._store_dataframe( raw_data_group, self.processor.filtered_raw_dataframe, "df", ) else: raise ValueError("Unexpected reader version!") # Store the filtered dataframe for compatibility with ParaView self._store_dataframe( paraview_group, self.processor.filtered_dataframe, "dataframe", ) # Store important parameters self._store_parameters(parameters_group) # No error self._message = "" # Return success return True except Exception as e: # Set the error message self._message = str(e) return False @property def message(self): """Return last error message.""" return self._message def _store_dataframe(self, group, dataframe, datataset_name): """Write a Pandas DataFrame in a way that it can be reloaded without external dependencies. To support external tools like ParaView (via our reader plug-in) that may not have required Pandas dependencies (like "tables"), we save the dataframe as a NumPy array and save the column names and types as attributes. """ if dataframe is None: return # Convert the column names to a list of strings column_names = dataframe.columns.tolist() # Convert column data types to a list of strings column_types = [str(dataframe[col].dtype) for col in dataframe.columns] # Convert boolean types to np.int8 to be hdf5-compatible for col in self.boolean_columns: if col in dataframe.columns: dataframe[col] = np.int8(dataframe[col]) dataset = group.create_dataset( datataset_name, data=dataframe.to_numpy(), compression="gzip", ) # Store the column names as an attribute of the dataset dataset.attrs["column_names"] = column_names # Store (original) column data types as attribute of the dataset dataset.attrs["column_types"] = column_types # We preserve the index as well (as a Dataset, since it can be large) index_data = np.array(dataframe.index) group.create_dataset( f"{datataset_name}_index", data=index_data, compression="gzip" ) def _store_parameters(self, group): """Write important parameters.""" group.create_dataset("z_scaling_factor", data=self.processor.z_scaling_factor) group.create_dataset("min_trace_length", data=self.processor.min_trace_length) if self.state.applied_efo_thresholds is not None: group.create_dataset( "applied_efo_thresholds", data=self.state.applied_efo_thresholds, ) if self.state.applied_cfr_thresholds is not None: group.create_dataset( "applied_cfr_thresholds", data=self.state.applied_cfr_thresholds, ) if self.state.applied_tr_len_thresholds is not None: group.create_dataset( "applied_tr_len_thresholds", data=self.state.applied_tr_len_thresholds, ) if self.state.applied_time_thresholds is not None: group.create_dataset( "applied_time_thresholds", data=self.state.applied_time_thresholds, ) group.create_dataset("num_fluorophores", data=self.processor.num_fluorophores) group.create_dataset("dwell_time", data=self.state.dwell_time) group.create_dataset("scale_bar_size", data=self.state.scale_bar_size) # HDF5 does not have a native boolean type, so we save as int8 and convert it # back to boolean on read. group.create_dataset("is_tracking", data=np.int8(self.state.is_tracking)) group.create_dataset("pool_dcr", data=np.int8(self.state.pool_dcr)) # Store fluorophore names as attributes (JSON-encoded for easy parsing) # Format: {"1": "cycle1", "2": "cycle2", ...} fluorophore_names = self.processor.fluorophore_names if fluorophore_names: # Convert to JSON string for storage import json names_json = json.dumps({str(k): v for k, v in fluorophore_names.items()}) group.attrs["fluorophore_names"] = names_json # Store TID offsets as attributes (JSON-encoded for easy parsing) tid_offsets = getattr(self.processor.dataset, "tid_offsets", []) if tid_offsets: import json offsets_payload = [ {"first_iid": int(first_iid), "tid_offset": int(tid_offset)} for first_iid, tid_offset in tid_offsets ] group.attrs["tid_offsets"] = json.dumps(offsets_payload)Writer of (processed) MINFLUX into native
.pmxformat.Instance variables
prop message-
Expand source code
@property def message(self): """Return last error message.""" return self._messageReturn last error message.
Methods
def write(self, file_name: pathlib.Path | str) ‑> bool-
Expand source code
def write(self, file_name: Union[Path, str]) -> bool: try: # Create HDF5 file with the structure with h5py.File(file_name, "w") as f: # Set file version attribute f.attrs["file_version"] = "3.0" # Set MinFluxReader version f.attrs["reader_version"] = self.processor.reader.version # Create groups raw_data_group = f.create_group("raw") parameters_group = f.create_group("parameters") paraview_group = f.create_group("paraview") # If the reader is version 1, we store filtered_raw_data_array # as "raw/npy" for backwards compatibility if self.processor.reader.version == 1: raw_data_group.create_dataset( "npy", data=self.processor.filtered_raw_data_array, compression="gzip", ) elif self.processor.reader.version == 2: self._store_dataframe( raw_data_group, self.processor.filtered_raw_dataframe, "df", ) else: raise ValueError("Unexpected reader version!") # Store the filtered dataframe for compatibility with ParaView self._store_dataframe( paraview_group, self.processor.filtered_dataframe, "dataframe", ) # Store important parameters self._store_parameters(parameters_group) # No error self._message = "" # Return success return True except Exception as e: # Set the error message self._message = str(e) return False