diff --git a/execution.py b/execution.py index c3a62f1cbc79..daf61ba199da 100644 --- a/execution.py +++ b/execution.py @@ -1116,6 +1116,144 @@ def get_history(self, prompt_id=None, max_items=None, offset=-1): else: return {} + def get_ordered_history(self, max_items=None, offset=0): + """ + Retrieves execution history in chronological order with pagination support. + Returns a lightweight list of history objects. + Used by the /history_v2. + + API Output Structure: + { + "history": [ + { + "prompt_id": str, # Unique identifier for this execution + "outputs": dict, # Node outputs {node_id: ui_data} + "meta": dict, # Node metadata {node_id: {node_id, display_node, parent_node, real_node_id}} + "prompt": { + "priority": int, # Execution priority + "prompt_id": str, # Same as root prompt_id + "extra_data": dict # Additional metadata (workflow removed from extra_pnginfo) + } | None, # None if no prompt data available + "status": { + "status_str": str, # "success" | "error" + "messages": [ # Filtered execution event messages + (event_name: str, event_data: dict) + ] + } | None # None if no status recorded + }, + # ... more history items + ] + } + + Parameters: + - max_items: Maximum number of items to return (None = all) + - offset: Starting index (0-based, negative values calculated from end) + """ + with self.mutex: + history_keys = list(self.history.keys()) + + if offset < 0 and max_items is not None: + offset = max(0, len(history_keys) - max_items) + + end_index = offset + max_items if max_items is not None else None + selected_keys = history_keys[offset:end_index] + + history_items = [] + for key in selected_keys: + history_entry = self.history[key] + + filtered_prompt = None + if "prompt" in history_entry: + priority, prompt_id, _, extra_data, _ = history_entry["prompt"] + + filtered_extra_data = {} + for k, v in extra_data.items(): + if k == "extra_pnginfo": + filtered_extra_data[k] = { + pk: pv for pk, pv in v.items() + if pk != "workflow" + } + else: + filtered_extra_data[k] = v + + filtered_prompt = { + "priority": priority, + "prompt_id": prompt_id, + "extra_data": filtered_extra_data + } + + status = None + if history_entry.get("status"): + status = { + "status_str": history_entry["status"]["status_str"], + "messages": [(e, {k: v for k, v in d.items() if k != "nodes"}) + if e == "execution_cached" else (e, d) + for e, d in history_entry["status"]["messages"]] + } + + item = { + "prompt_id": key, + "outputs": history_entry.get("outputs", {}), + "meta": history_entry.get("meta", {}), + "prompt": filtered_prompt, + "status": status + } + + history_items.append(item) + + return {"history": history_items} + + def get_history_v2(self, prompt_id): + """ + Retrieves execution history for a specific prompt ID. + Used by /history_v2/:prompt_id + + API Output Structure: + { + "": { + "prompt": { + "priority": int, # Execution priority + "prompt_id": str, # Same as the key + "prompt": dict, # The workflow/node data + "extra_data": dict, # Additional metadata (client_id, etc.) + "outputs_to_execute": list # Node IDs to execute + }, + "outputs": dict, # Node outputs {node_id: ui_data} + "meta": dict, # Node metadata {node_id: {node_id, display_node, parent_node, real_node_id}} + "status": { + "status_str": str, # "success" | "error" + "completed": bool, # Whether execution finished + "messages": list # Execution event messages + } | None # None if no status recorded + } + } + + Returns empty dict {} if prompt_id not found. + """ + with self.mutex: + if prompt_id in self.history: + history_entry = self.history[prompt_id] + + new_entry = {} + + if "prompt" in history_entry: + priority, prompt_id_inner, prompt_data, extra_data, outputs_to_execute = history_entry["prompt"] + new_entry["prompt"] = { + "priority": priority, + "prompt_id": prompt_id_inner, + "prompt": prompt_data, + "extra_data": extra_data, + "outputs_to_execute": outputs_to_execute + } + + for key, value in history_entry.items(): + if key != "prompt": + new_entry[key] = value + + return {prompt_id: new_entry} + else: + return {} + def wipe_history(self): with self.mutex: self.history = {} diff --git a/server.py b/server.py index 71a58f0fa9e6..1fa914e8b747 100644 --- a/server.py +++ b/server.py @@ -652,6 +652,22 @@ async def get_history_prompt_id(request): prompt_id = request.match_info.get("prompt_id", None) return web.json_response(self.prompt_queue.get_history(prompt_id=prompt_id)) + @routes.get("/history_v2") + async def get_ordered_history(request): + max_items = request.rel_url.query.get("max_items", None) + if max_items is not None: + max_items = int(max_items) + + offset = request.rel_url.query.get("offset", 0) + offset = int(offset) + + return web.json_response(self.prompt_queue.get_ordered_history(max_items=max_items, offset=offset)) + + @routes.get("/history_v2/{prompt_id}") + async def get_history_v2_prompt_id(request): + prompt_id = request.match_info.get("prompt_id", None) + return web.json_response(self.prompt_queue.get_history_v2(prompt_id=prompt_id)) + @routes.get("/queue") async def get_queue(request): queue_info = {} diff --git a/tests/inference/test_execution.py b/tests/inference/test_execution.py index 9d3d685ccaf2..b5f41c70d6f3 100644 --- a/tests/inference/test_execution.py +++ b/tests/inference/test_execution.py @@ -63,10 +63,39 @@ def get_image(self, filename, subfolder, folder_type): with urllib.request.urlopen("http://{}/view?{}".format(self.server_address, url_values)) as response: return response.read() - def get_history(self, prompt_id): - with urllib.request.urlopen("http://{}/history/{}".format(self.server_address, prompt_id)) as response: + def get_history(self, prompt_id=None, max_items=None): + if prompt_id: + url = "http://{}/history/{}".format(self.server_address, prompt_id) + else: + url = "http://{}/history".format(self.server_address) + if max_items is not None: + url += "?max_items={}".format(max_items) + with urllib.request.urlopen(url) as response: + return json.loads(response.read()) + + def get_ordered_history(self, max_items=None, offset=None): + url = "http://{}/history_v2".format(self.server_address) + params = {} + if max_items is not None: + params['max_items'] = str(max_items) + if offset is not None: + params['offset'] = str(offset) + if params: + url += "?" + urllib.parse.urlencode(params) + with urllib.request.urlopen(url) as response: return json.loads(response.read()) + def get_history_v2_for_prompt(self, prompt_id): + url = "http://{}/history_v2/{}".format(self.server_address, prompt_id) + with urllib.request.urlopen(url) as response: + return json.loads(response.read()) + + def clear_history(self): + data = json.dumps({"clear": True}).encode('utf-8') + req = urllib.request.Request("http://{}/history".format(self.server_address), data=data) + req.add_header('Content-Type', 'application/json') + urllib.request.urlopen(req) + def set_test_name(self, name): self.test_name = name @@ -585,3 +614,246 @@ def test_execution_block_list_output(self, client: ComfyClient, builder: GraphBu assert len(images) == 2, "Should have 2 images" assert numpy.array(images[0]).min() == 0 and numpy.array(images[0]).max() == 0, "First image should be black" assert numpy.array(images[1]).min() == 0 and numpy.array(images[1]).max() == 0, "Second image should also be black" + + def test_ordered_history_endpoint(self, client: ComfyClient, builder: GraphBuilder): + """Test the ordered history endpoint returns data in chronological order.""" + # Clear history to start fresh + client.clear_history() + + # Run multiple prompts to test ordering + prompt_ids = [] + for _ in range(3): + g = builder + input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1) + g.node("SaveImage", images=input1.out(0)) + + result = client.run(g) + prompt_ids.append(result.get_prompt_id()) + time.sleep(0.1) # Small delay to ensure different timestamps + + # Test ordered history endpoint + ordered_history = client.get_ordered_history() + assert "history" in ordered_history, "Ordered history should have history key" + assert isinstance(ordered_history["history"], list), "Ordered history should be a list" + assert len(ordered_history["history"]) == 3, "Should have exactly 3 prompts in history" + + # Verify chronological ordering (most recent first) + history_prompt_ids = [item["prompt_id"] for item in ordered_history["history"]] + + # Should be in chronological order (oldest first, as they're added in completion order) + assert history_prompt_ids == prompt_ids, "History should be in chronological order" + + def test_history_prompt_id_endpoint(self, client: ComfyClient, builder: GraphBuilder): + """Test fetching specific prompt history by ID.""" + g = builder + input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1) + g.node("SaveImage", images=input1.out(0)) + + result = client.run(g) + prompt_id = result.get_prompt_id() + + # Test legacy history endpoint for specific prompt + legacy_history = client.get_history(prompt_id) + assert prompt_id in legacy_history, "Legacy history should contain prompt ID" + assert "outputs" in legacy_history[prompt_id], "Legacy history should have outputs" + + # Test history_v2 endpoint for specific prompt + specific_history = client.get_history_v2_for_prompt(prompt_id) + assert prompt_id in specific_history, "History v2 should contain prompt ID" + + # Verify key fields match between legacy and v2 + v2_data = specific_history[prompt_id] + legacy_data = legacy_history[prompt_id] + + # Check that outputs and status match + assert v2_data["outputs"] == legacy_data["outputs"], "Outputs should match" + assert v2_data["status"] == legacy_data["status"], "Status should match" + + # Verify prompt is converted to dict format in v2 + assert isinstance(v2_data["prompt"], dict), "Prompt should be a dictionary in v2" + assert "prompt_id" in v2_data["prompt"], "Prompt dict should have prompt_id" + assert "priority" in v2_data["prompt"], "Prompt dict should have priority" + assert "prompt" in v2_data["prompt"], "Prompt dict should have prompt data" + assert "extra_data" in v2_data["prompt"], "Prompt dict should have extra_data" + assert "outputs_to_execute" in v2_data["prompt"], "Prompt dict should have outputs_to_execute" + + def test_history_max_items(self, client: ComfyClient): + """Test legacy history endpoint with max_items parameter.""" + # Clear history to start fresh + client.clear_history() + + # Run multiple prompts to test pagination + for _ in range(5): + g = GraphBuilder() # Create fresh GraphBuilder for each run + input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1) + g.node("SaveImage", images=input1.out(0)) + + client.run(g) + time.sleep(0.1) # Small delay to ensure different timestamps + + # Test max_items parameter on legacy history endpoint + limited_history = client.get_history(max_items=2) + assert len(limited_history) == 2, "History should return exactly max_items" + + def test_ordered_history_max_items_and_offset(self, client: ComfyClient): + """Test ordered history endpoint with max_items and offset parameters.""" + # Clear history to start fresh + client.clear_history() + + # Run multiple prompts to test pagination + for _ in range(5): + g = GraphBuilder() # Create fresh GraphBuilder for each run + input1 = g.node("StubImage", content="BLACK", height=512, width=512, batch_size=1) + g.node("SaveImage", images=input1.out(0)) + + client.run(g) + time.sleep(0.1) # Small delay to ensure different timestamps + + # Test max_items parameter on ordered history endpoint + limited_ordered = client.get_ordered_history(max_items=3) + assert "history" in limited_ordered, "Limited ordered history should have history key" + assert len(limited_ordered["history"]) == 3, "Ordered history should return exactly max_items" + + # Test pagination with offset as cursor + full_history = client.get_ordered_history() + assert len(full_history["history"]) == 5, "Should have 5 items in full history" + + # Extract prompt IDs from full history for comparison + full_prompt_ids = [item["prompt_id"] for item in full_history["history"]] + + # Test proper pagination behavior with offset as cursor + + # Test first page: offset=0, max_items=2 + page1 = client.get_ordered_history(max_items=2, offset=0) + assert len(page1["history"]) == 2, "First page should have 2 items" + page1_ids = [item["prompt_id"] for item in page1["history"]] + assert page1_ids == full_prompt_ids[0:2], "First page should contain items at indices 0-1" + + # Test second page: offset=2, max_items=2 + page2 = client.get_ordered_history(max_items=2, offset=2) + assert len(page2["history"]) == 2, "Second page should have 2 items" + page2_ids = [item["prompt_id"] for item in page2["history"]] + assert page2_ids == full_prompt_ids[2:4], "Second page should contain items at indices 2-3" + + # Test third page: offset=4, max_items=2 + page3 = client.get_ordered_history(max_items=2, offset=4) + assert len(page3["history"]) == 1, "Third page should have 1 remaining item" + page3_ids = [item["prompt_id"] for item in page3["history"]] + assert page3_ids == full_prompt_ids[4:5], "Third page should contain item at index 4" + + # Verify no overlap between pages + all_paginated_ids = page1_ids + page2_ids + page3_ids + assert len(set(all_paginated_ids)) == 5, "All paginated IDs should be unique" + assert set(all_paginated_ids) == set(full_prompt_ids), "Paginated results should cover all items" + + # Test default behavior: get first N items (no offset specified) + # When offset is not specified, it defaults to 0 + first_2_items = client.get_ordered_history(max_items=2) + assert len(first_2_items["history"]) == 2, "Default behavior should return 2 items" + first_2_ids = [item["prompt_id"] for item in first_2_items["history"]] + # This should be equivalent to offset=0 with max_items=2 + assert first_2_ids == full_prompt_ids[0:2], "Default behavior should return first 2 items" + + # Test offset beyond available items + beyond_offset = client.get_ordered_history(max_items=2, offset=10) + assert len(beyond_offset["history"]) == 0, "Offset beyond items should return empty list" + + def test_ordered_history_prompt_field_filtering_unit(self): + """Unit test for prompt field filtering logic in get_ordered_history.""" + from execution import PromptQueue + + # Mock server + class MockServer: + def queue_updated(self): pass + + # Create queue and add mock history + queue = PromptQueue(MockServer()) + + # Mock history entry with full prompt structure + mock_prompt_tuple = ( + 12345, # priority/timestamp + 'test-prompt-123', # prompt_id + {'nodes': {'1': {'class_type': 'SaveImage'}}}, # workflow (should be filtered) + { # extra_data + 'client_id': 'test-client', + 'extra_pnginfo': { + 'workflow': {'nodes': {'1': {'class_type': 'SaveImage'}}}, # Should be filtered out + 'version': '1.0' # Should be preserved + } + }, + ['1'] # execute_outputs (should be filtered) + ) + + queue.history['test-prompt-123'] = { + 'prompt': mock_prompt_tuple, + 'outputs': {'1': {'images': []}}, + 'status': { + 'status_str': 'success', + 'completed': True, # Should be filtered out + 'messages': [ + ('execution_cached', {'nodes': ['node1', 'node2'], 'timestamp': 1234567890}), # 'nodes' should be filtered + ('execution_start', {'timestamp': 1234567800}) # Should remain unchanged + ] + }, + 'meta': {'1': {'node_id': '1'}} + } + + # Test get_ordered_history with our filtering + result = queue.get_ordered_history() + + # Verify structure + assert "history" in result, "Result should have history key" + assert len(result["history"]) == 1, "Should have one history item" + + history_item = result["history"][0] + + # Verify prompt_id field is added + assert "prompt_id" in history_item, "History item should have prompt_id field" + assert history_item["prompt_id"] == 'test-prompt-123', "prompt_id should match" + + # Verify prompt field is filtered + filtered_prompt = history_item["prompt"] + assert isinstance(filtered_prompt, dict), "Filtered prompt should be a dictionary" + assert "priority" in filtered_prompt, "Filtered prompt should have priority" + assert "prompt_id" in filtered_prompt, "Filtered prompt should have prompt_id" + assert "extra_data" in filtered_prompt, "Filtered prompt should have extra_data" + + # Verify correct elements are preserved + assert filtered_prompt["priority"] == 12345, "Priority should be preserved" + assert filtered_prompt["prompt_id"] == 'test-prompt-123', "Prompt ID should be preserved" + + # Verify extra_data filtering + extra_data = filtered_prompt["extra_data"] + assert extra_data['client_id'] == 'test-client', "Client ID should be preserved" + assert 'extra_pnginfo' in extra_data, "extra_pnginfo should be present" + assert 'workflow' not in extra_data['extra_pnginfo'], "Workflow should be filtered out" + assert extra_data['extra_pnginfo']['version'] == '1.0', "Other extra_pnginfo data should be preserved" + + # Verify other fields are unchanged + assert history_item["outputs"] == {'1': {'images': []}}, "Outputs should be unchanged" + assert history_item["meta"] == {'1': {'node_id': '1'}}, "Meta should be unchanged" + + # Verify status field filtering + status = history_item["status"] + assert "status_str" in status, "Status should have status_str" + assert status["status_str"] == 'success', "Status string should be preserved" + assert "completed" not in status, "Completed field should be filtered out" + assert "messages" in status, "Status should have messages" + + # Verify message filtering + messages = status["messages"] + assert len(messages) == 2, "Should have 2 messages" + + # Check execution_cached message has nodes filtered out + execution_cached_msg = messages[0] + assert execution_cached_msg[0] == 'execution_cached', "First message should be execution_cached" + cached_data = execution_cached_msg[1] + assert "nodes" not in cached_data, "Nodes field should be filtered from execution_cached messages" + assert "timestamp" in cached_data, "Timestamp should be preserved in execution_cached messages" + assert cached_data["timestamp"] == 1234567890, "Timestamp value should be correct" + + # Check execution_start message remains unchanged + execution_start_msg = messages[1] + assert execution_start_msg[0] == 'execution_start', "Second message should be execution_start" + start_data = execution_start_msg[1] + assert start_data == {'timestamp': 1234567800}, "execution_start message should be unchanged"