エヴァのMAGIシステムもどきを作ってみた
※ 画像・動画等のコンテンツが上手く生成されないことがあります。その際はお手数ですが、ページの更新を複数回お願いします。
目次
概要
APIが公開されているテキスト生成AIモデルを利用して3つのAIに同じ質問をする。その結果を集約して、エヴァに出てきたような「MAGIシステム」を実現したい。
想定
今回、私の今後のこのシステムの使用頻度と大学生の金銭的都合から、「GPT-3.5 Turbo」と「Claude-Instant v1」の2体に議題を投げ、それぞれの意見のまとめ役と結論付けを「claude v2」にやってもらうことにした。(余裕が生まれ次第、AIモデルを増やしたい...)
使用技術
Go
今回のベース。AI系のAPI(SDK)とやたら相性の良いPythonでも良かったが、
- CLI制作といえばGo
- 個人利用のみでPublicなサービスにしないから
- それぞれ生成AIのレスポンス自体に時間がかかる
ということでGoを採用。
go 1.20
AWS Bedrock
GPT4よりも優秀と海外で話題のAnthropic製「Claude v2」のAPIを使う唯一の手段。「Claude-Instant v1」のAPIもBedrock経由で使うことになる。
SDK
GoとAWSということでお馴染みのaws-sdk-go-v2を使ってAWS Bedrockを操作していく。
OpenAI API
皆さんお馴染み、ChatGPTでよく使う「GPT-3.5 Turbo」のAPIを利用する手段。
SDK
GPT-3.5 Turboの操作は、OpenAIのAPIをGo向けに提供している SDKを使用する。似たSDKの中で一番Star数が多かった。
(余談だが、APIの無料期間が3ヶ月あると思い込んでいたが、実際にはChatGPT登録から3ヶ月の勘違いで、課金する羽目になった( ; ; ))
作ってみた
プロンプトの作成
AIモデル2体に議題を投げるため、そのためのベースとなるプロンプトを組む。
また、最終的に論をまとめ上げ結論づけてもらうためのプロンプトを組む。
コーディング
要所要所に説明する。
全体のコードは、こちら
各種API,SDKのクライアント初期化
特別なことはしてないが、AWSBedrockの経由でClaude2を利用のにAPIキーとかは必要なく、AWS Bedrockのアクセス許可のあるIAMユーザーのみを用意。
OpenAIのAPIキーは事前に取得しておく。 以前、【Claude2】Go+AWS BedrockでClaude2のAPIを使ってみるという記事を書いたのでこちらも参考に。
※ init関数にはエラーハンドリングを要する処理を書くなと聞いたことがあるため、main関数またはその他に移しても良いかも。
Go
var brc *bedrockruntime.Client var client *openai.Client func init() { err := godotenv.Load(".env") if err != nil { log.Fatal("Error loading .env file") } // OpenAI API Client client = openai.NewClient(os.Getenv("OPENAI_API_KEY")) // AWS Bedrock Runtime Client region := os.Getenv("AWS_REGION") if region == "" { region = defaultRegion } cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion(region), config.WithSharedConfigProfile(os.Getenv("AWS_PROFILE"))) if err != nil { log.Fatal(err) } brc = bedrockruntime.NewFromConfig(cfg) }
CLIの整備
CLIで対話的に行う。 議題を1回投げたら、Goroutinを使って2つのAPI(GPT-3.5 Turbo,Claude Instant v1)を叩いて並行処理させたい。Goroutinを使う理由は順にAPI叩いてたら時間かかるため。
一応、それぞれの AIモデルの意見も見てみたいから、出力しておく。
Go
func main() { var gpt35Answer string var claudeInstantAnswer string // プロンプトの入力 fmt.Print("Enter your prompt: ") var userPrompt string fmt.Scanln(&userPrompt) var wg sync.WaitGroup wg.Add(2) // GPT-3.5 Turboにプロンプトを送信(並行処理) go func() { defer wg.Done() fmt.Println("Sending to GPT-3.5 Turbo...") gpt35Answer = sendGPT35(userPrompt) fmt.Println("GPT-3.5 Turbo is expressed!!!") }() // Claude Instantにプロンプトを送信(並行処理) go func() { defer wg.Done() fmt.Println("Sending to Claude Instant...") claudeInstantAnswer = sendClaudeInstantV1(userPrompt) fmt.Println("Claude Instant is expressed!!!") }() // 並行処理が完了するまで待つ wg.Wait() // "..." の表示をクリア fmt.Print("\033[2K\r") fmt.Println("--------------------------------------------------") fmt.Println("GPT-3.5 Turbo: \n", gpt35Answer) fmt.Println("--------------------------------------------------") fmt.Println("Claude Instant: \n", claudeInstantAnswer) fmt.Println("--------------------------------------------------") fmt.Println("Awaiting final opinions...") // 2秒間の待機 time.Sleep(2 * time.Second) // 2つの回答をClaude2にまとめてもらう _, err := sendClaude2(gpt35Answer, claudeInstantAnswer) if err != nil { log.Fatal("Error sending to Claude2: ", err) } }
GPT-3.5 Turboにプロンプトを送信
投げつけるだけ。
Go
func sendGPT35(prompt string) string { resp, err := client.CreateChatCompletion( context.Background(), openai.ChatCompletionRequest{ Model: openai.GPT3Dot5Turbo, Messages: []openai.ChatCompletionMessage{ { Role: openai.ChatMessageRoleUser, Content: basePrompt + prompt, }, }, }, ) if err != nil { log.Fatal(err) } return resp.Choices[0].Message.Content }
Claude Instantにプロンプトを送信
メッセージのフォーマットとして、AWS Bedrock経由でClaudeのAPIを使うにはHuman,Assistantともに必須人物となる。これがないとエラー。
Go
const claudePromptFormat = "\n\nHuman: %s\n\nAssistant:" func sendClaudeInstantV1(prompt string) string { payload := Request{Prompt: fmt.Sprintf(claudePromptFormat, basePrompt+prompt), MaxTokensToSample: 2048} payloadBytes, err := json.Marshal(payload) if err != nil { log.Fatal(err) } output, err := brc.InvokeModel(context.Background(), &bedrockruntime.InvokeModelInput{ Body: payloadBytes, ModelId: aws.String("anthropic.claude-instant-v1"), ContentType: aws.String("application/json"), }) if err != nil { log.Fatal(err) } var resp Response err = json.Unmarshal(output.Body, &resp) if err != nil { log.Fatal(err) } return resp.Completion } // request/response model type Request struct { Prompt string `json:"prompt"` MaxTokensToSample int `json:"max_tokens_to_sample"` Temperature float64 `json:"temperature,omitempty"` TopP float64 `json:"top_p,omitempty"` TopK int `json:"top_k,omitempty"` StopSequences []string `json:"stop_sequences,omitempty"` } type Response struct { Completion string `json:"completion"` }
2つの回答をClaude2にまとめて結論を出してもらう
こちらも部分的にClaude Instantと同じ。違う点は、レスポンスをストリーミング形式で出してもらうところだ。この場合のストリーミング形式とは、生成された文字を少しずつ表示していくこと。
InvokeModelWithResponseStream で呼び出し、そのストリーミング形式の応答データが入ったレスポンスとfunc(ctx context.Context, part []byte) であるストリーム用アウトプットハンドラーを引数に与えて、processStreamingOutput にぶち込む。 processStreamingOutput ではレスポンス内にあるGetStream().Events() をループさせてストリームからのイベントを受信している。一応、イベントのタイプに応じた処理をSwitchで行って、断片的に送られてくるチャンクデータとエラーを仕分けしている。
Go
output, err := brc.InvokeModelWithResponseStream(context.Background(), &bedrockruntime.InvokeModelWithResponseStreamInput{ Body: payloadBytes, ModelId: aws.String("anthropic.claude-v2"), ContentType: aws.String("application/json"), }) type StreamingOutputHandler func(ctx context.Context, part []byte) error func processStreamingOutput(output *bedrockruntime.InvokeModelWithResponseStreamOutput, handler StreamingOutputHandler) (Response, error)
全体がこれ↓
Go
func sendClaude2(answer1, answer2 string) (string, error) { combinedPrompt := fmt.Sprintf("GPT-3.5 Turbo: %s\n\nClaude Instant: %s", answer1, answer2) payload := Request{Prompt: fmt.Sprintf(claudePromptFormat, askClaude2+combinedPrompt), MaxTokensToSample: 2048} payloadBytes, err := json.Marshal(payload) if err != nil { return "", err } output, err := brc.InvokeModelWithResponseStream(context.Background(), &bedrockruntime.InvokeModelWithResponseStreamInput{ Body: payloadBytes, ModelId: aws.String("anthropic.claude-v2"), ContentType: aws.String("application/json"), }) if err != nil { return "", err } resp, err := processStreamingOutput(output, func(ctx context.Context, part []byte) error { fmt.Print(string(part)) return nil }) if err != nil { log.Fatal("streaming output processing error: ", err) } return resp.Completion, nil } type StreamingOutputHandler func(ctx context.Context, part []byte) error func processStreamingOutput(output *bedrockruntime.InvokeModelWithResponseStreamOutput, handler StreamingOutputHandler) (Response, error) { var combinedResult string resp := Response{} for event := range output.GetStream().Events() { switch v := event.(type) { case *types.ResponseStreamMemberChunk: //fmt.Println("payload", string(v.Value.Bytes)) var resp Response err := json.NewDecoder(bytes.NewReader(v.Value.Bytes)).Decode(&resp) if err != nil { return resp, err } handler(context.Background(), []byte(resp.Completion)) combinedResult += resp.Completion case *types.UnknownUnionMember: fmt.Println("unknown tag:", v.Tag) default: fmt.Println("union is nil or unknown type") } } resp.Completion = combinedResult return resp, nil } // request/response model type Request struct { Prompt string `json:"prompt"` MaxTokensToSample int `json:"max_tokens_to_sample"` Temperature float64 `json:"temperature,omitempty"` TopP float64 `json:"top_p,omitempty"` TopK int `json:"top_k,omitempty"` StopSequences []string `json:"stop_sequences,omitempty"` } type Response struct { Completion string `json:"completion"` }
仕上がり
感想
エヴァ本編では、MAGIシステムは使徒との戦闘における作戦の立案や検討などを行うが、私はこのシステムのAIモデル数さらに増やして、学校のレポートとの戦闘に活用と思う。
あとbedrockのsdkでストリーミング形式でレスポンスをもらえることに驚いた。