clean: remove duplicate run_operation, dead code now
This commit is contained in:
@@ -171,52 +171,3 @@ class Operation:
|
|||||||
context.logger.info(
|
context.logger.info(
|
||||||
f"{self.operation_name}: {total_processed} items processed.")
|
f"{self.operation_name}: {total_processed} items processed.")
|
||||||
return total_processed
|
return total_processed
|
||||||
|
|
||||||
|
|
||||||
def run_operation(
|
|
||||||
strategy,
|
|
||||||
operation_name: str,
|
|
||||||
filter_fn: Optional[Callable[[Any], bool]] = None
|
|
||||||
) -> int:
|
|
||||||
context = OperationContext()
|
|
||||||
progress = ProgressTracker(operation=operation_name)
|
|
||||||
|
|
||||||
context.logger.info(
|
|
||||||
f"Starting {operation_name} with batch_size={context.batch_size}, delay={context.delay}s"
|
|
||||||
)
|
|
||||||
|
|
||||||
cursor = None
|
|
||||||
total_processed = 0
|
|
||||||
batch_num = 0
|
|
||||||
|
|
||||||
while True:
|
|
||||||
batch_num += 1
|
|
||||||
response = strategy.fetch(context, cursor)
|
|
||||||
items = strategy.extract_items(response)
|
|
||||||
|
|
||||||
if not items:
|
|
||||||
break
|
|
||||||
|
|
||||||
progress.batch(batch_num, len(items))
|
|
||||||
|
|
||||||
for item in items:
|
|
||||||
if filter_fn and not filter_fn(item):
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
strategy.process_item(item, context)
|
|
||||||
total_processed += 1
|
|
||||||
progress.update(1)
|
|
||||||
except Exception as e:
|
|
||||||
context.logger.error(f"Error processing item: {e}")
|
|
||||||
|
|
||||||
cursor = strategy.get_cursor(response)
|
|
||||||
if not cursor:
|
|
||||||
break
|
|
||||||
|
|
||||||
if context.delay > 0:
|
|
||||||
time.sleep(context.delay)
|
|
||||||
|
|
||||||
context.logger.info(
|
|
||||||
f"{operation_name}: {total_processed} items processed.")
|
|
||||||
return total_processed
|
|
||||||
|
|||||||
Reference in New Issue
Block a user