Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline loops fail when BranchJoiner receives multiple inputs #7960

Open
vblagoje opened this issue Jul 1, 2024 · 2 comments
Open

Pipeline loops fail when BranchJoiner receives multiple inputs #7960

vblagoje opened this issue Jul 1, 2024 · 2 comments
Assignees
Labels
P1 High priority, add to the next sprint type:bug Something isn't working

Comments

@vblagoje
Copy link
Member

vblagoje commented Jul 1, 2024

Pipeline loops in Haystack currently fail when the prompt_concatenator_after_observation component (see attached pipeline graph) loops back ChatMessage list to the main_input BranchJoiner. The BranchJoiner fails with the following error message:

  File "/Users/vblagoje/workspace/haystack/haystack/core/pipeline/pipeline.py", line 76, in _run_component
    res: Dict[str, Any] = instance.run(**inputs)
  File "/Users/vblagoje/workspace/haystack/haystack/components/joiners/branch.py", line 140, in run
    raise ValueError(f"BranchJoiner expects only one input, but {inputs_count} were received.")
ValueError: BranchJoiner expects only one input, but 2 were received.

looping_pipeline

This issue seem to originate in the BranchJoiner receiving both the initial input and the looped back input simultaneously, violating its pre-condition of a single input.

Steps to reproduce:

import os
from typing import List, Optional, Dict, Any
import re
from haystack.dataclasses import ChatMessage

from haystack import Document, component
from haystack.components.builders import DynamicChatPromptBuilder
from haystack.components.converters import OutputAdapter
from haystack.components.routers import ConditionalRouter
from haystack.components.joiners import BranchJoiner
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.websearch import SerperDevWebSearch
from haystack import Pipeline
from haystack.utils import Secret

os.environ["OPENAI_API_KEY"] = "some-fake-key-replace-with-real-if-you-need-to-use-it"


def find_last_action(chat_messages: List[ChatMessage]):
    prompt: str = chat_messages[-1].content
    lines = prompt.strip().split('\n')
    for line in reversed(lines):
        pattern = r'Action:\s*(\w+)\[(.*?)\]'

        match = re.search(pattern, line)
        if match:
            action_name = match.group(1)
            parameter = match.group(2)
            return [action_name, parameter]
    return [None, None]


def concat_prompt(last_message: ChatMessage, current_prompt: List[ChatMessage], append: str):
    return [ChatMessage.from_user(current_prompt[-1].content + last_message.content + append)]


search_message_template = """
Given these web search results:

{% for doc in documents %}
    {{ doc.content }}
{% endfor %}

Be as brief as possible, max one sentence. 
Answer the question: {{search_query}}
"""

react_message_template = """
Solve a question answering task with interleaving Thought, Action, Observation steps.

Thought reasons about the current situation

Action can be:
google_search - Searches Google for the exact concept/entity (given in square brackets) and returns the results for you to use
finish - Returns the final answer (given in square brackets) and finishes the task

Observation sumarizes the Action outcome and helps in formulating the next
Thought in Thought, Action, Observation interleaving triplet of steps.

After each Observation, provide the next Thought and next Action.
Don't execute multiple steps even though you know the answer.
Only generate Thought and Action, never Observation, you'll get Observation from Action.
Follow the pattern in the example below.

Example:
###########################
Question: Which magazine was started first Arthur’s Magazine or First for Women?
Thought: I need to search Arthur’s Magazine and First for Women, and find which was started
first.
Action: google_search[When was 'Arthur’s Magazine' started?]
Observation: Arthur’s Magazine was an American literary periodical ˘
published in Philadelphia and founded in 1844. Edited by Timothy Shay Arthur, it featured work by
Edgar A. Poe, J.H. Ingraham, Sarah Josepha Hale, Thomas G. Spear, and others. In May 1846
it was merged into Godey’s Lady’s Book.
Thought: Arthur’s Magazine was started in 1844. I need to search First for Women founding date next
Action: google_search[When was 'First for Women' magazine started?]
Observation: First for Women is a woman’s magazine published by Bauer Media Group in the
USA. The magazine was started in 1989. It is based in Englewood Cliffs, New Jersey. In 2011
the circulation of the magazine was 1,310,696 copies.
Thought: First for Women was started in 1989. 1844 (Arthur’s Magazine) ¡ 1989 (First for
Women), so Arthur’s Magazine was started first.
Action: finish[Arthur’s Magazine]
############################

Let's start, the question is: {{query}}

Thought:
"""

routes = [
    {
        "condition": "{{'search' in tool_id_and_param[0]}}",
        "output": "{{tool_id_and_param[1]}}",
        "output_name": "search",
        "output_type": str,
    },
    {
        "condition": "{{'finish' in tool_id_and_param[0]}}",
        "output": "{{tool_id_and_param[1]}}",
        "output_name": "finish",
        "output_type": str,
    }
]


@component
class NoOp:
    @component.output_types(output=str)
    def run(self, query: str):
        return {"output": query}


class FakeThoughtActionOpenAIChatGenerator(OpenAIChatGenerator):

    @component.output_types(replies=List[ChatMessage])
    def run(self, messages: List[ChatMessage], generation_kwargs: Optional[Dict[str, Any]] = None):
        return {"replies": [ChatMessage.from_assistant("Thought: thinking\n Action: google_search[not important]\n")]}


class FakeConclusionOpenAIChatGenerator(OpenAIChatGenerator):

    @component.output_types(replies=List[ChatMessage])
    def run(self, messages: List[ChatMessage], generation_kwargs: Optional[Dict[str, Any]] = None):
        return {"replies": [ChatMessage.from_assistant("Tower of Pisa is 55 meters tall\n")]}


class FakeSerperDevWebSearch(SerperDevWebSearch):

    @component.output_types(documents=List[Document])
    def run(self, query: str):
        return {"documents": [Document(content="Eiffel Tower is 300 meters tall"),
                              Document(content="Tower of Pisa is 55 meters tall")]}


# main part
pipeline = Pipeline()
pipeline.add_component("main_input", BranchJoiner(List[ChatMessage]))
pipeline.add_component("prompt_builder", DynamicChatPromptBuilder(runtime_variables=["query"]))
pipeline.add_component("llm", FakeThoughtActionOpenAIChatGenerator(generation_kwargs={"stop": "Observation:"}))
pipeline.add_component("noop", NoOp())

# tools
pipeline.add_component("tool_extractor", OutputAdapter("{{messages | find_action}}",
                                                       output_type=List[str],
                                                       custom_filters={"find_action": find_last_action}))

pipeline.add_component("prompt_concatenator_after_action",
                       OutputAdapter("{{replies[-1] | concat_prompt(current_prompt,'')}}",
                                     output_type=List[ChatMessage],
                                     custom_filters={"concat_prompt": concat_prompt}))

pipeline.add_component("router", ConditionalRouter(routes))
pipeline.add_component("router_search",
                       FakeSerperDevWebSearch(api_key=Secret.from_token("some_fake_api_key")))
pipeline.add_component("search_prompt_builder",
                       DynamicChatPromptBuilder(runtime_variables=["documents", "search_query"]))
pipeline.add_component("search_llm", FakeConclusionOpenAIChatGenerator())
pipeline.add_component("router_finish", OutputAdapter("{{final_answer | format_final_answer}}",
                                                      output_type=str,
                                                      custom_filters={"format_final_answer": lambda x: x}))

pipeline.add_component("search_output_adapter", OutputAdapter("{{search_replies | format_observation}}",
                                                              output_type=List[ChatMessage],
                                                              custom_filters={"format_observation": lambda x: [
                                                                  ChatMessage.from_assistant(
                                                                      "Observation: " + x[-1].content + "\n")]}))

pipeline.add_component("prompt_concatenator_after_observation",
                       OutputAdapter("{{replies[-1] | concat_prompt(current_prompt, '\nThought:')}}",
                                     output_type=List[ChatMessage],
                                     custom_filters={"concat_prompt": concat_prompt}))

# main
pipeline.connect("main_input", "prompt_builder.prompt_source")
pipeline.connect("noop", "prompt_builder.query")
pipeline.connect("prompt_builder.prompt", "llm.messages")
pipeline.connect("llm.replies", "prompt_concatenator_after_action.replies")

# tools
pipeline.connect("prompt_builder.prompt", "prompt_concatenator_after_action.current_prompt")
pipeline.connect("prompt_concatenator_after_action", "tool_extractor.messages")

pipeline.connect("tool_extractor", "router")
pipeline.connect("router.search", "router_search.query")
pipeline.connect("router_search.documents", "search_prompt_builder.documents")
pipeline.connect("router.search", "search_prompt_builder.search_query")
pipeline.connect("search_prompt_builder.prompt", "search_llm.messages")
pipeline.connect("router.finish", "router_finish")

pipeline.connect("search_llm.replies", "search_output_adapter.search_replies")
pipeline.connect("search_output_adapter", "prompt_concatenator_after_observation.replies")
pipeline.connect("prompt_concatenator_after_action", "prompt_concatenator_after_observation.current_prompt")
pipeline.connect("prompt_concatenator_after_observation", "main_input")

search_message = [ChatMessage.from_user(search_message_template)]
messages = [ChatMessage.from_user(react_message_template)]
question = "which tower is taller: eiffel tower or tower of pisa?"
res = pipeline.run(data={"main_input": {"value": messages},
                         "noop": {"query": question},
                         "search_prompt_builder": {"prompt_source": search_message}})

print(res)

Expected behavior:
The pipeline should handle loops correctly, allowing the BranchJoiner to process looped inputs sequentially rather than simultaneously.

Actual behavior:
The pipeline fails when the loop feeds back to the BranchJoiner, causing it to receive multiple inputs at once raising the above mentioned exception

@mrm1001
Copy link
Member

mrm1001 commented Jul 5, 2024

Hi @vblagoje is this error related to this issue: #7740?

@vblagoje
Copy link
Member Author

vblagoje commented Jul 5, 2024

@mrm1001 yes - we can't do any react agent loops until this one has been resolved

@shadeMe shadeMe added the type:bug Something isn't working label Jul 5, 2024
@mrm1001 mrm1001 added the P1 High priority, add to the next sprint label Jul 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
P1 High priority, add to the next sprint type:bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants