Coverage for /home/runner/work/torchcvnn/torchcvnn/src/torchcvnn/datasets/slc/slc_file.py: 0%
47 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 05:19 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 05:19 +0000
1# MIT License
3# Copyright (c) 2024 Jeremy Fix
5# Permission is hereby granted, free of charge, to any person obtaining a copy
6# of this software and associated documentation files (the "Software"), to deal
7# in the Software without restriction, including without limitation the rights
8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9# copies of the Software, and to permit persons to whom the Software is
10# furnished to do so, subject to the following conditions:
12# The above copyright notice and this permission notice shall be included in
13# all copies or substantial portions of the Software.
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23# Standard imports
24import os
25import pathlib
27# External imports
28import numpy as np
30# Local imports
31from .ann_file import AnnFile
34def parse_slc_filename(filename):
35 """
36 Parses a filename of a SLC file
37 {site name}_{line ID}_{flight ID}_{data take counter}_{acquisition date}_{band}{steering}{polarization}_{stack_version}... _{baseline correction}_{segment number}_{downsample factor}.slc
39 and returns all the information in a dictionary
40 """
41 # Remove the .slc extension and split the fields
42 fields = filename[:-4].split("_")
43 parameters = {
44 "site_name": fields[0],
45 "line_ID": fields[1],
46 "flight_ID": fields[2],
47 "data_take_counter": fields[3],
48 "acquisition_date": fields[4],
49 "band": fields[5][0],
50 "steering": fields[5][1:-2],
51 "polarization": fields[5][-2:],
52 "stack_version": fields[6],
53 "baseline_correction": fields[7],
54 "segment_number": int(
55 fields[8][1:]
56 ), # the segment is encoded as s{segment_number}
57 "downsample_factor": fields[9],
58 }
59 return parameters
62class SLCFile:
63 r"""
64 Reads a SLC file
66 The filenames contain interesting information:
68 {site name}_{line ID}_{flight ID}_{data take counter}_{acquisition date}_{band}{steering}{polarization}_{stack_version}... _{baseline correction}_{segment number}_{downsample factor}.slc
70 e.g. SSurge_15305_14170_007_141120_L090HH_01_BC_s1_1x1.slc is
72 - site_name : SSurge
73 - line ID : 15305
74 - flight ID : 14170
75 - data take counter : 007
76 - acquisition date : 141120, the date is in YYMMDD format (UTC time).
77 - band : L
78 - steering : 090
79 - polarization : HH
80 - stack version : 01
81 - baseline correction : BC, means the data is corrected for residual baseline
82 - segment number : s1
83 - downsample factor : 1x1
85 There is one SLC file per segment and per polarization.
86 """
88 def __init__(self, filename: str, patch_size: tuple, patch_stride: tuple = None):
89 self.filename = pathlib.Path(filename)
90 self.parameters = parse_slc_filename(self.filename.name)
91 self.patch_size = patch_size
92 self.patch_stride = patch_stride
93 if self.patch_stride is None:
94 self.patch_stride = patch_size
96 # The annotation filename is almost the same as the SLC filename, except we drop
97 # the segment number and downsample factor
98 # We expect it to be colocated with the SLC file
99 ann_filename = "_".join(str(self.filename.name).split("_")[:-2]) + ".ann"
100 self.ann_file = AnnFile(str(self.filename.parent / ann_filename))
102 downsample_factor = self.parameters["downsample_factor"]
103 segment_number = self.parameters["segment_number"]
104 # self.azimuth_pixel_spacing = getattr(
105 # self.ann_file, f"{downsample_factor}_slc_azimuth_pixel_spacing"
106 # )
107 # self.range_pixel_spacing = getattr(
108 # self.ann_file, f"{downsample_factor}_slc_range_pixel_spacing"
109 # )
110 # self.global_average_squint_angle = self.ann_file.global_average_squint_angle
111 # self.center_wavelength = self.ann_file.center_wavelength
112 self.n_rows = getattr(
113 self.ann_file, f"slc_{segment_number}_{downsample_factor}_rows"
114 )
115 self.n_cols = getattr(
116 self.ann_file, f"slc_{segment_number}_{downsample_factor}_columns"
117 )
119 # Precompute the dimension of the grid of patches
120 nrows_patch, ncols_patch = self.patch_size
121 row_stride, col_stride = self.patch_stride
123 self.nsamples_per_rows = (self.n_rows - nrows_patch) // row_stride + 1
124 self.nsamples_per_cols = (self.n_cols - ncols_patch) // col_stride + 1
126 @property
127 def key(self):
128 return "_".join(
129 (
130 self.parameters["site_name"],
131 self.parameters["line_ID"],
132 self.parameters["flight_ID"],
133 self.parameters["data_take_counter"],
134 self.parameters["acquisition_date"],
135 self.parameters["band"],
136 self.parameters["steering"],
137 self.parameters["stack_version"],
138 self.parameters["baseline_correction"],
139 )
140 )
142 @property
143 def polarization(self):
144 return self.parameters["polarization"]
146 def __len__(self):
147 """
148 Returns the number of patches that can be extracted from the SLC file
149 """
150 return self.nsamples_per_rows * self.nsamples_per_cols
152 def __getitem__(self, item):
153 """
154 Returns the item-th patch from the SLC file
155 """
156 assert 0 <= item < len(self)
157 # Compute the row and column index of the patch
158 row = item // self.nsamples_per_cols
159 col = item % self.nsamples_per_cols
161 # Compute the starting row and column index of the patch
162 row_stride, col_stride = self.patch_stride
163 row_start = row * row_stride
164 col_start = col * col_stride
166 # Read the patch
167 # The SLC file is a binary file of complex64
168 patch = np.zeros(self.patch_size, dtype=np.complex64)
169 with open(self.filename, "rb") as fh:
170 for row in range(self.patch_size[0]):
171 fh.seek(
172 (row_start + row) * self.n_cols * 8 + col_start * 8, os.SEEK_SET
173 )
174 patch[row, :] = np.fromfile(
175 fh, dtype=np.complex64, count=self.patch_size[1]
176 )
177 return patch