Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 41 additions & 24 deletions lib/chat_models/chat_google_ai.ex
Original file line number Diff line number Diff line change
Expand Up @@ -631,30 +631,23 @@ defmodule LangChain.ChatModels.ChatGoogleAI do
{:ok, %Req.Response{status: 200, body: data} = response} ->
Callbacks.fire(google_ai.callbacks, :on_llm_response_headers, [response.headers])

# Separate message deltas by their content type
{data, _last_index} =
data
|> List.flatten()
|> Enum.reduce({[], nil}, fn
message_delta, {[], nil} ->
{[message_delta], message_delta.index}

message_delta, {acc, last_index} ->
[last_message_delta | _] = acc
last_content_type = get_in(last_message_delta.content.type)
content_type = get_in(message_delta.content.type)

new_index =
case not is_nil(content_type) && content_type != last_content_type do
true -> last_index + 1
false -> last_index
end

{[%{message_delta | index: new_index} | acc], new_index}
end)

data
|> Enum.reverse()
flattened = List.flatten(data)

# Some candidates can come back as `{:error, %LangChainError{}}` from
# `do_process_response/3` (e.g. `MALFORMED_FUNCTION_CALL`, candidates
# without a "content" key, or unknown shapes). The reindexing logic below
# assumes every item is a `%MessageDelta{}` with an `:index`, so any error
# tuple in the list must be surfaced before the reduce — otherwise it
# crashes with `KeyError` on `message_delta.index`.
case Enum.find(flattened, &match?({:error, _}, &1)) do
{:error, %LangChainError{} = error} ->
{:error, error}

nil ->
flattened
|> reindex_deltas()
|> Enum.reverse()
end

{:ok, %Req.Response{body: {:error, %LangChainError{} = error}}} ->
{:error, error}
Expand Down Expand Up @@ -701,6 +694,30 @@ defmodule LangChain.ChatModels.ChatGoogleAI do
end
end

# Separate message deltas by their content type by reindexing them.
defp reindex_deltas(deltas) do
{data, _last_index} =
Enum.reduce(deltas, {[], nil}, fn
message_delta, {[], nil} ->
{[message_delta], message_delta.index}

message_delta, {acc, last_index} ->
[last_message_delta | _] = acc
last_content_type = get_in(last_message_delta.content.type)
content_type = get_in(message_delta.content.type)

Comment thread
nelsonkopliku marked this conversation as resolved.
new_index =
case not is_nil(content_type) && content_type != last_content_type do
true -> last_index + 1
false -> last_index
end

{[%{message_delta | index: new_index} | acc], new_index}
end)

data
end

# Convert Google AI error status to a LangChainError type string.
defp google_error_type(%{"status" => status}) when is_binary(status) do
status |> String.downcase()
Expand Down
77 changes: 77 additions & 0 deletions test/chat_models/chat_google_ai_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1986,5 +1986,82 @@ defmodule ChatModels.ChatGoogleAITest do
error = LangChainError.exception(type: "resource_exhausted", message: "Quota exceeded")
refute ChatGoogleAI.retry_on_fallback?(error)
end

test "streaming MALFORMED_FUNCTION_CALL candidate returns error instead of crashing" do
# When Gemini cannot form a valid function call, it returns a candidate
# without a "content" key, e.g.:
#
# %{"finishMessage" => "Malformed function call: ...",
# "finishReason" => "MALFORMED_FUNCTION_CALL",
# "index" => 0}
#
# `do_process_response/3` falls through to the `_other` clause and emits
# an `{:error, %LangChainError{}}` for that candidate. This used to crash
# the streaming reduce in `do_api_request/3` with `KeyError` on `:index`.

candidate_error1 =
LangChainError.exception(
type: "unexpected_response",
message: "Unexpected response 1",
original: %{
"finishMessage" => "Malformed function call 1",
"finishReason" => "MALFORMED_FUNCTION_CALL",
"index" => 0
}
)

candidate_error2 =
LangChainError.exception(
type: "unexpected_response",
message: "Unexpected response 2",
original: %{
"finishMessage" => "Malformed function call 2",
"finishReason" => "MALFORMED_FUNCTION_CALL",
"index" => 0
}
)

delta1 = %LangChain.MessageDelta{
content: %LangChain.Message.ContentPart{type: :text, content: "Part 1"},
index: 0,
role: :assistant,
status: :incomplete
}

delta2 = %LangChain.MessageDelta{
content: %LangChain.Message.ContentPart{type: :text, content: "Part 2"},
index: 0,
role: :assistant,
status: :incomplete
}

model =
ChatGoogleAI.new!(%{
stream: true,
model: "gemini-2.5-flash"
})

expect(Req, :post, fn _req, _opts ->
{:ok,
%Req.Response{
status: 200,
headers: %{},
body: [
[delta1],
[{:error, candidate_error1}],
[delta2],
[{:error, candidate_error2}]
]
}}
end)

# The call should return the first error it encounters
assert {:error, %LangChainError{} = error} =
ChatGoogleAI.call(model, [Message.new_user!("Hello")])

assert error.type == "unexpected_response"
assert error.message == "Unexpected response 1"
assert error.original["finishReason"] == "MALFORMED_FUNCTION_CALL"
end
end
end