11
11
import tempfile
12
12
from typing import List , Union
13
13
14
+ import cloudpickle
14
15
import fsspec
15
16
import oracledb
16
17
import pandas as pd
@@ -126,7 +127,26 @@ def load_data(data_spec, storage_options=None, **kwargs):
126
127
return data
127
128
128
129
130
+ def _safe_write (fn , ** kwargs ):
131
+ try :
132
+ fn (** kwargs )
133
+ except Exception :
134
+ logger .warning (f'Failed to write file { kwargs .get ("filename" , "UNKNOWN" )} ' )
135
+
136
+
129
137
def write_data (data , filename , format , storage_options = None , index = False , ** kwargs ):
138
+ return _safe_write (
139
+ fn = _write_data ,
140
+ data = data ,
141
+ filename = filename ,
142
+ format = format ,
143
+ storage_options = storage_options ,
144
+ index = index ,
145
+ ** kwargs ,
146
+ )
147
+
148
+
149
+ def _write_data (data , filename , format , storage_options = None , index = False , ** kwargs ):
130
150
disable_print ()
131
151
if not format :
132
152
_ , format = os .path .splitext (filename )
@@ -143,11 +163,24 @@ def write_data(data, filename, format, storage_options=None, index=False, **kwar
143
163
144
164
145
165
def write_json (json_dict , filename , storage_options = None ):
166
+ return _safe_write (
167
+ fn = _write_json ,
168
+ json_dict = json_dict ,
169
+ filename = filename ,
170
+ storage_options = storage_options ,
171
+ )
172
+
173
+
174
+ def _write_json (json_dict , filename , storage_options = None ):
146
175
with fsspec .open (filename , mode = "w" , ** storage_options ) as f :
147
176
f .write (json .dumps (json_dict ))
148
177
149
178
150
179
def write_simple_json (data , path ):
180
+ return _safe_write (fn = _write_simple_json , data = data , path = path )
181
+
182
+
183
+ def _write_simple_json (data , path ):
151
184
if ObjectStorageDetails .is_oci_path (path ):
152
185
storage_options = default_signer ()
153
186
else :
@@ -156,6 +189,60 @@ def write_simple_json(data, path):
156
189
json .dump (data , f , indent = 4 )
157
190
158
191
192
+ def write_file (local_filename , remote_filename , storage_options , ** kwargs ):
193
+ return _safe_write (
194
+ fn = _write_file ,
195
+ local_filename = local_filename ,
196
+ remote_filename = remote_filename ,
197
+ storage_options = storage_options ,
198
+ ** kwargs ,
199
+ )
200
+
201
+
202
+ def _write_file (local_filename , remote_filename , storage_options , ** kwargs ):
203
+ with open (local_filename ) as f1 :
204
+ with fsspec .open (
205
+ remote_filename ,
206
+ "w" ,
207
+ ** storage_options ,
208
+ ) as f2 :
209
+ f2 .write (f1 .read ())
210
+
211
+
212
+ def load_pkl (filepath ):
213
+ return _safe_write (fn = _load_pkl , filepath = filepath )
214
+
215
+
216
+ def _load_pkl (filepath ):
217
+ storage_options = {}
218
+ if ObjectStorageDetails .is_oci_path (filepath ):
219
+ storage_options = default_signer ()
220
+
221
+ with fsspec .open (filepath , "rb" , ** storage_options ) as f :
222
+ return cloudpickle .load (f )
223
+ return None
224
+
225
+
226
+ def write_pkl (obj , filename , output_dir , storage_options ):
227
+ return _safe_write (
228
+ fn = _write_pkl ,
229
+ obj = obj ,
230
+ filename = filename ,
231
+ output_dir = output_dir ,
232
+ storage_options = storage_options ,
233
+ )
234
+
235
+
236
+ def _write_pkl (obj , filename , output_dir , storage_options ):
237
+ pkl_path = os .path .join (output_dir , filename )
238
+ with fsspec .open (
239
+ pkl_path ,
240
+ "wb" ,
241
+ ** storage_options ,
242
+ ) as f :
243
+ cloudpickle .dump (obj , f )
244
+
245
+
159
246
def merge_category_columns (data , target_category_columns ):
160
247
result = data .apply (
161
248
lambda x : "__" .join ([str (x [col ]) for col in target_category_columns ]), axis = 1
@@ -290,4 +377,8 @@ def disable_print():
290
377
291
378
# Restore
292
379
def enable_print ():
380
+ try :
381
+ sys .stdout .close ()
382
+ except Exception :
383
+ pass
293
384
sys .stdout = sys .__stdout__
0 commit comments