mirror of
https://github.com/ollama/ollama.git
synced 2026-05-05 23:53:43 -05:00
server: cache show responses (#15967)
This commit is contained in:
27
server/model_caches.go
Normal file
27
server/model_caches.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package server
|
||||
|
||||
import "context"
|
||||
|
||||
type modelCaches struct {
|
||||
recommendations *modelRecommendationsCache
|
||||
show *modelShowCache
|
||||
}
|
||||
|
||||
func newModelCaches() *modelCaches {
|
||||
return &modelCaches{
|
||||
recommendations: newModelRecommendationsCache(),
|
||||
show: newModelShowCache(),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *modelCaches) Start(ctx context.Context) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
if c.recommendations != nil {
|
||||
c.recommendations.Start(ctx)
|
||||
}
|
||||
if c.show != nil {
|
||||
c.show.Start(ctx)
|
||||
}
|
||||
}
|
||||
@@ -306,7 +306,7 @@ func TestModelRecommendationsHandlerUsesCache(t *testing.T) {
|
||||
ctx, _ := gin.CreateTestContext(w)
|
||||
ctx.Request = httptest.NewRequest(http.MethodGet, "/api/experimental/model-recommendations", nil)
|
||||
|
||||
s := &Server{modelRecommendations: cache}
|
||||
s := &Server{modelCaches: &modelCaches{recommendations: cache}}
|
||||
s.ModelRecommendationsExperimentalHandler(ctx)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
@@ -326,7 +326,7 @@ func TestModelRecommendationsRouteRegistration(t *testing.T) {
|
||||
|
||||
cache := newModelRecommendationsCache()
|
||||
cache.set([]api.ModelRecommendation{{Model: "route-model", Description: "route description"}})
|
||||
s := &Server{modelRecommendations: cache}
|
||||
s := &Server{modelCaches: &modelCaches{recommendations: cache}}
|
||||
|
||||
router, err := s.GenerateRoutes(nil)
|
||||
if err != nil {
|
||||
|
||||
705
server/model_show_cache.go
Normal file
705
server/model_show_cache.go
Normal file
@@ -0,0 +1,705 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ollama/ollama/api"
|
||||
internalcloud "github.com/ollama/ollama/internal/cloud"
|
||||
"github.com/ollama/ollama/internal/modelref"
|
||||
"github.com/ollama/ollama/manifest"
|
||||
"github.com/ollama/ollama/types/model"
|
||||
"github.com/ollama/ollama/version"
|
||||
)
|
||||
|
||||
/*
|
||||
The /api/show cache stores full api.ShowResponse values because callers use
|
||||
more than capabilities: launch flows also need context length, embeddings
|
||||
metadata, quantization details, remote metadata, and model-specific fields.
|
||||
|
||||
Local model entries are stored by canonical model name and verbose flag, with
|
||||
the manifest digest recorded in the entry. The manifest digest is the freshness
|
||||
boundary: if the model content changes, the digest changes, so the previous
|
||||
response is replaced instead of accumulating under an old digest key. Requests
|
||||
with System or Options overlays bypass the cache because those overlays mutate
|
||||
the effective show response.
|
||||
|
||||
Cloud model entries are keyed by normalized cloud base model name and verbose.
|
||||
They use stale-while-revalidate behavior: a warm read returns the cached
|
||||
response immediately and starts a throttled background refresh for that model.
|
||||
Cold cloud reads preserve existing proxy behavior. Local and cloud entries live
|
||||
in separate maps, so a local "qwen3.5" and an explicit "qwen3.5:cloud" cannot
|
||||
collide. The cloud suffix is request routing intent; api.ShowResponse does not
|
||||
carry a model-name field to reconstruct on the way out.
|
||||
|
||||
The cache is process-local. Startup hydration runs asynchronously from the
|
||||
current local manifests and cloud tags; no show responses are written to or read
|
||||
from ~/.ollama/cache/show. That keeps cache lifetime tied to the server process
|
||||
and avoids snapshot freshness and invalidation cases for this iteration.
|
||||
*/
|
||||
|
||||
const (
|
||||
modelShowCloudFetchTimeout = 3 * time.Second
|
||||
modelShowCloudReadRefreshCooldown = 5 * time.Second
|
||||
modelShowCloudHydrationConcurrency = 4
|
||||
)
|
||||
|
||||
var errModelShowNoCloud = errors.New("cloud disabled")
|
||||
|
||||
// modelShowCache owns process-local show response caches for local and cloud
|
||||
// models. All cached responses are cloned at read/write boundaries so
|
||||
// handler-specific mutations, such as user-agent compatibility tweaks, cannot
|
||||
// leak back into the cache.
|
||||
type modelShowCache struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
local map[modelShowLocalKey]modelShowLocalEntry
|
||||
cloud map[modelShowCloudKey]*api.ShowResponse
|
||||
|
||||
cloudRefreshing map[modelShowCloudKey]bool
|
||||
cloudNextReadRefreshAfter map[modelShowCloudKey]time.Time
|
||||
|
||||
once sync.Once
|
||||
client *http.Client
|
||||
getModelInfo func(api.ShowRequest) (*api.ShowResponse, error)
|
||||
}
|
||||
|
||||
// modelShowLocalKey describes the local cache slot for a model response. The
|
||||
// manifest digest is stored in the entry instead of the key so a pulled or
|
||||
// recreated model overwrites the previous response for the same model/verbose
|
||||
// variant instead of leaving stale digest-keyed entries behind.
|
||||
//
|
||||
// Deleted models are not eagerly pruned from this process-local cache. Manifest
|
||||
// resolution happens before local cache lookup, so stale delete entries are not
|
||||
// served and disappear on process restart.
|
||||
type modelShowLocalKey struct {
|
||||
Model string
|
||||
Verbose bool
|
||||
}
|
||||
|
||||
type modelShowLocalEntry struct {
|
||||
Digest string
|
||||
Response *api.ShowResponse
|
||||
}
|
||||
|
||||
// modelShowCloudKey intentionally excludes any local digest because cloud
|
||||
// models are refreshed through SWR and normalized by cloud base model name.
|
||||
type modelShowCloudKey struct {
|
||||
Model string
|
||||
Verbose bool
|
||||
}
|
||||
|
||||
func newModelShowCache() *modelShowCache {
|
||||
return &modelShowCache{
|
||||
local: make(map[modelShowLocalKey]modelShowLocalEntry),
|
||||
cloud: make(map[modelShowCloudKey]*api.ShowResponse),
|
||||
cloudRefreshing: make(map[modelShowCloudKey]bool),
|
||||
cloudNextReadRefreshAfter: make(map[modelShowCloudKey]time.Time),
|
||||
client: http.DefaultClient,
|
||||
getModelInfo: GetModelInfo,
|
||||
}
|
||||
}
|
||||
|
||||
// modelShowCacheable returns whether a request can use the shared show cache.
|
||||
// System and Options overlays are request-specific response variants, so v1
|
||||
// bypasses caching for those rather than expanding the key space.
|
||||
func modelShowCacheable(req api.ShowRequest) bool {
|
||||
return req.System == "" && len(req.Options) == 0
|
||||
}
|
||||
|
||||
// Start kicks off non-blocking startup hydration. The cache remains
|
||||
// process-local; warm entries appear as the background local and cloud scans
|
||||
// populate the maps.
|
||||
func (c *modelShowCache) Start(ctx context.Context) {
|
||||
c.once.Do(func() {
|
||||
slog.Debug("starting model show cache")
|
||||
go c.runStartup(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
// runStartup hydrates local and cloud caches concurrently. It is only called in
|
||||
// a goroutine from Start, so manifest scans and cloud requests cannot delay the
|
||||
// listener from accepting traffic.
|
||||
func (c *modelShowCache) runStartup(ctx context.Context) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := c.hydrateLocal(ctx); err != nil && !errors.Is(err, context.Canceled) {
|
||||
slog.Warn("model show local cache hydration failed", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := c.hydrateCloud(ctx); err != nil {
|
||||
switch {
|
||||
case errors.Is(err, context.Canceled):
|
||||
case errors.Is(err, errModelShowNoCloud):
|
||||
slog.Debug("skipping model show cloud cache hydration because cloud is disabled")
|
||||
default:
|
||||
slog.Warn("model show cloud cache hydration failed", "error", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// GetLocal returns a cached local show response when the current manifest
|
||||
// digest matches. On a miss, it falls back to GetModelInfo, stores non-remote
|
||||
// local responses, and returns a clone to the caller.
|
||||
func (c *modelShowCache) GetLocal(req api.ShowRequest) (*api.ShowResponse, error) {
|
||||
key, digest, err := modelShowLocalKeyForRequest(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp, ok := c.getLocal(key, digest); ok {
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
req.Model = key.Model
|
||||
resp, err := c.getModelInfo(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.RemoteHost == "" {
|
||||
c.setLocal(key, digest, resp)
|
||||
}
|
||||
|
||||
return cloneShowResponse(resp), nil
|
||||
}
|
||||
|
||||
// GetCloudSWR returns a cached cloud show response and triggers a throttled
|
||||
// background refresh. The boolean is false on a cold miss so callers can
|
||||
// preserve existing synchronous proxy behavior.
|
||||
func (c *modelShowCache) GetCloudSWR(ctx context.Context, req api.ShowRequest) (*api.ShowResponse, bool) {
|
||||
key := modelShowCloudKeyForModel(req.Model, req.Verbose)
|
||||
resp, ok := c.getCloud(key)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
c.triggerCloudRefreshOnRead(ctx, key)
|
||||
return resp, true
|
||||
}
|
||||
|
||||
func (c *modelShowCache) getLocal(key modelShowLocalKey, digest string) (*api.ShowResponse, bool) {
|
||||
c.mu.RLock()
|
||||
entry, ok := c.local[key]
|
||||
c.mu.RUnlock()
|
||||
if !ok || entry.Digest != digest || entry.Response == nil {
|
||||
return nil, false
|
||||
}
|
||||
return cloneShowResponse(entry.Response), true
|
||||
}
|
||||
|
||||
func (c *modelShowCache) setLocal(key modelShowLocalKey, digest string, resp *api.ShowResponse) {
|
||||
c.mu.Lock()
|
||||
c.local[key] = modelShowLocalEntry{
|
||||
Digest: digest,
|
||||
Response: cloneShowResponse(resp),
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *modelShowCache) hasLocal(key modelShowLocalKey, digest string) bool {
|
||||
c.mu.RLock()
|
||||
entry, ok := c.local[key]
|
||||
c.mu.RUnlock()
|
||||
return ok && entry.Digest == digest && entry.Response != nil
|
||||
}
|
||||
|
||||
func (c *modelShowCache) getCloud(key modelShowCloudKey) (*api.ShowResponse, bool) {
|
||||
c.mu.RLock()
|
||||
resp, ok := c.cloud[key]
|
||||
c.mu.RUnlock()
|
||||
if !ok || resp == nil {
|
||||
return nil, false
|
||||
}
|
||||
return cloneShowResponse(resp), true
|
||||
}
|
||||
|
||||
func (c *modelShowCache) setCloud(key modelShowCloudKey, resp *api.ShowResponse) {
|
||||
c.mu.Lock()
|
||||
c.cloud[key] = cloneShowResponse(resp)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *modelShowCache) beginCloudReadRefresh(key modelShowCloudKey) bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
if c.cloudRefreshing[key] || now.Before(c.cloudNextReadRefreshAfter[key]) {
|
||||
return false
|
||||
}
|
||||
|
||||
c.cloudRefreshing[key] = true
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *modelShowCache) endCloudReadRefresh(key modelShowCloudKey) {
|
||||
c.mu.Lock()
|
||||
c.cloudRefreshing[key] = false
|
||||
c.cloudNextReadRefreshAfter[key] = time.Now().Add(modelShowCloudReadRefreshCooldown)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// triggerCloudRefreshOnRead starts the revalidation side of SWR. The refresh
|
||||
// uses context.WithoutCancel so a completed client request does not cancel the
|
||||
// cache update it initiated.
|
||||
func (c *modelShowCache) triggerCloudRefreshOnRead(ctx context.Context, key modelShowCloudKey) {
|
||||
if !c.beginCloudReadRefresh(key) {
|
||||
return
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
ctx = context.WithoutCancel(ctx)
|
||||
|
||||
slog.Debug("triggering model show cloud refresh on read", "model", key.Model, "verbose", key.Verbose)
|
||||
go func() {
|
||||
defer c.endCloudReadRefresh(key)
|
||||
|
||||
if err := c.refreshCloud(ctx, key); err != nil {
|
||||
switch {
|
||||
case errors.Is(err, errModelShowNoCloud):
|
||||
slog.Debug("skipping model show cloud read refresh because cloud is disabled", "model", key.Model)
|
||||
default:
|
||||
slog.Warn("model show cloud read refresh failed", "model", key.Model, "error", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// refreshCloud fetches and stores one cloud show response. Refresh failures are
|
||||
// returned without touching the existing cached entry, which preserves stale
|
||||
// data for future reads.
|
||||
func (c *modelShowCache) refreshCloud(ctx context.Context, key modelShowCloudKey) error {
|
||||
if disabled, _ := internalcloud.Status(); disabled {
|
||||
return errModelShowNoCloud
|
||||
}
|
||||
|
||||
resp, err := c.fetchCloudShow(ctx, key.Model, key.Verbose)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.setCloud(key, resp)
|
||||
return nil
|
||||
}
|
||||
|
||||
// hydrateLocal scans manifests at startup and refreshes only entries missing
|
||||
// for the current digest. It hydrates non-verbose responses only, avoiding an
|
||||
// expensive tensor walk for users who have never asked for verbose show data.
|
||||
func (c *modelShowCache) hydrateLocal(ctx context.Context) error {
|
||||
manifests, err := manifest.Manifests(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for name, mf := range manifests {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if modelShowManifestIsRemote(mf) {
|
||||
continue
|
||||
}
|
||||
|
||||
modelName := name.String()
|
||||
digest := mf.Digest()
|
||||
key := modelShowLocalKey{
|
||||
Model: modelName,
|
||||
Verbose: false,
|
||||
}
|
||||
if c.hasLocal(key, digest) {
|
||||
continue
|
||||
}
|
||||
|
||||
resp, err := c.getModelInfo(api.ShowRequest{Model: modelName})
|
||||
if err != nil {
|
||||
slog.Warn("failed to hydrate local model show cache", "model", modelName, "error", err)
|
||||
continue
|
||||
}
|
||||
if resp.RemoteHost != "" {
|
||||
continue
|
||||
}
|
||||
|
||||
c.setLocal(key, digest, resp)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// hydrateCloud refreshes cloud show entries by listing cloud tags and fetching
|
||||
// /api/show for each returned model with bounded concurrency. Per-model show
|
||||
// failures are logged and skipped so one bad cloud entry does not prevent the
|
||||
// rest of the cache from warming.
|
||||
func (c *modelShowCache) hydrateCloud(ctx context.Context) error {
|
||||
if disabled, _ := internalcloud.Status(); disabled {
|
||||
return errModelShowNoCloud
|
||||
}
|
||||
|
||||
models, err := c.fetchCloudTags(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
jobs := make(chan string)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
worker := func() {
|
||||
defer wg.Done()
|
||||
for modelName := range jobs {
|
||||
if ctx.Err() != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
key := modelShowCloudKeyForModel(modelName, false)
|
||||
resp, err := c.fetchCloudShow(ctx, key.Model, key.Verbose)
|
||||
if err != nil {
|
||||
slog.Warn("failed to hydrate cloud model show cache", "model", key.Model, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
c.setCloud(key, resp)
|
||||
}
|
||||
}
|
||||
|
||||
workers := min(modelShowCloudHydrationConcurrency, max(1, len(models)))
|
||||
for range workers {
|
||||
wg.Add(1)
|
||||
go worker()
|
||||
}
|
||||
|
||||
sendLoop:
|
||||
for _, modelName := range models {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break sendLoop
|
||||
case jobs <- modelName:
|
||||
}
|
||||
}
|
||||
close(jobs)
|
||||
wg.Wait()
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// fetchCloudTags returns de-duplicated cloud model names normalized to their
|
||||
// show-cache key form. It accepts either ListModelResponse.Model or the legacy
|
||||
// Name field because /api/tags responses may contain both.
|
||||
func (c *modelShowCache) fetchCloudTags(ctx context.Context) ([]string, error) {
|
||||
var payload api.ListResponse
|
||||
if err := c.doCloudJSON(ctx, http.MethodGet, "/api/tags", nil, &payload); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
seen := make(map[string]struct{}, len(payload.Models))
|
||||
models := make([]string, 0, len(payload.Models))
|
||||
for _, item := range payload.Models {
|
||||
name := strings.TrimSpace(item.Model)
|
||||
if name == "" {
|
||||
name = strings.TrimSpace(item.Name)
|
||||
}
|
||||
name = modelShowNormalizeCloudModel(name)
|
||||
if name == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[name]; ok {
|
||||
continue
|
||||
}
|
||||
seen[name] = struct{}{}
|
||||
models = append(models, name)
|
||||
}
|
||||
|
||||
return models, nil
|
||||
}
|
||||
|
||||
func (c *modelShowCache) fetchCloudShow(ctx context.Context, modelName string, verbose bool) (*api.ShowResponse, error) {
|
||||
payload := api.ShowRequest{
|
||||
Model: modelShowNormalizeCloudModel(modelName),
|
||||
Verbose: verbose,
|
||||
}
|
||||
|
||||
var resp api.ShowResponse
|
||||
if err := c.doCloudJSON(ctx, http.MethodPost, "/api/show", payload, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.ModelInfo == nil {
|
||||
resp.ModelInfo = map[string]any{}
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// doCloudJSON is the cache's direct cloud client. It mirrors the cloud proxy's
|
||||
// signing and client-version behavior but uses an internal timeout because
|
||||
// hydration and refreshes must not hang indefinitely.
|
||||
func (c *modelShowCache) doCloudJSON(ctx context.Context, method, path string, payload any, out any) error {
|
||||
reqCtx, cancel := context.WithTimeout(ctx, modelShowCloudFetchTimeout)
|
||||
defer cancel()
|
||||
|
||||
baseURL, err := url.Parse(cloudProxyBaseURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
targetURL := baseURL.ResolveReference(&url.URL{Path: path})
|
||||
|
||||
var body io.Reader
|
||||
if payload != nil {
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
body = bytes.NewReader(data)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(reqCtx, method, targetURL.String(), body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Accept", "application/json")
|
||||
if payload != nil {
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
}
|
||||
if clientVersion := strings.TrimSpace(version.Version); clientVersion != "" {
|
||||
req.Header.Set(cloudProxyClientVersionHeader, clientVersion)
|
||||
}
|
||||
|
||||
if err := cloudProxySignRequest(req.Context(), req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
data, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp.StatusCode >= http.StatusBadRequest {
|
||||
return modelShowStatusError(resp, data)
|
||||
}
|
||||
|
||||
if out == nil {
|
||||
return nil
|
||||
}
|
||||
return json.Unmarshal(data, out)
|
||||
}
|
||||
|
||||
// modelShowStatusError preserves the important error shape from cloud
|
||||
// responses, including AuthorizationError for 401s and StatusError otherwise.
|
||||
func modelShowStatusError(resp *http.Response, body []byte) error {
|
||||
if resp.StatusCode == http.StatusUnauthorized {
|
||||
err := api.AuthorizationError{
|
||||
StatusCode: resp.StatusCode,
|
||||
Status: resp.Status,
|
||||
}
|
||||
_ = json.Unmarshal(body, &err)
|
||||
if err.Status == "" {
|
||||
err.Status = resp.Status
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
statusErr := api.StatusError{
|
||||
StatusCode: resp.StatusCode,
|
||||
Status: resp.Status,
|
||||
}
|
||||
if err := json.Unmarshal(body, &statusErr); err != nil || statusErr.ErrorMessage == "" {
|
||||
statusErr.ErrorMessage = strings.TrimSpace(string(body))
|
||||
}
|
||||
return statusErr
|
||||
}
|
||||
|
||||
// modelShowLocalKeyForRequest normalizes a local show request to the canonical
|
||||
// on-disk model name and returns the current manifest digest used to validate
|
||||
// the cached entry.
|
||||
func modelShowLocalKeyForRequest(req api.ShowRequest) (modelShowLocalKey, string, error) {
|
||||
name := model.ParseName(req.Model)
|
||||
if !name.IsValid() {
|
||||
return modelShowLocalKey{}, "", model.Unqualified(name)
|
||||
}
|
||||
name, err := getExistingName(name)
|
||||
if err != nil {
|
||||
return modelShowLocalKey{}, "", err
|
||||
}
|
||||
|
||||
mf, err := manifest.ParseNamedManifest(name)
|
||||
if err != nil {
|
||||
return modelShowLocalKey{}, "", err
|
||||
}
|
||||
|
||||
return modelShowLocalKey{
|
||||
Model: name.String(),
|
||||
Verbose: req.Verbose,
|
||||
}, mf.Digest(), nil
|
||||
}
|
||||
|
||||
func modelShowCloudKeyForModel(modelName string, verbose bool) modelShowCloudKey {
|
||||
return modelShowCloudKey{
|
||||
Model: modelShowNormalizeCloudModel(modelName),
|
||||
Verbose: verbose,
|
||||
}
|
||||
}
|
||||
|
||||
// modelShowNormalizeCloudModel strips explicit cloud source syntax, including
|
||||
// legacy "-cloud" tags, so :cloud and -cloud forms share a cache entry.
|
||||
func modelShowNormalizeCloudModel(modelName string) string {
|
||||
modelName = strings.TrimSpace(modelName)
|
||||
if modelName == "" {
|
||||
return ""
|
||||
}
|
||||
if base, stripped := modelref.StripCloudSourceTag(modelName); stripped {
|
||||
return strings.TrimSpace(base)
|
||||
}
|
||||
return modelName
|
||||
}
|
||||
|
||||
// modelShowManifestIsRemote checks whether a manifest represents a local stub
|
||||
// for a remote model. Startup hydration skips these so the local content cache
|
||||
// does not store entries whose freshness is governed by cloud state.
|
||||
func modelShowManifestIsRemote(mf *manifest.Manifest) bool {
|
||||
if mf == nil || mf.Config.Digest == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
f, err := mf.Config.Open()
|
||||
if err != nil {
|
||||
slog.Warn("failed to open manifest config while checking model show cache eligibility", "error", err)
|
||||
return false
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var cfg model.ConfigV2
|
||||
if err := json.NewDecoder(f).Decode(&cfg); err != nil {
|
||||
slog.Warn("failed to decode manifest config while checking model show cache eligibility", "error", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return cfg.RemoteHost != "" || cfg.RemoteModel != ""
|
||||
}
|
||||
|
||||
// cloneShowResponse deep-copies mutable fields of api.ShowResponse before
|
||||
// storing or returning cached entries. The response contains maps and slices,
|
||||
// and some handlers mutate ModelInfo before writing JSON.
|
||||
func cloneShowResponse(in *api.ShowResponse) *api.ShowResponse {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
out := *in
|
||||
out.Details.Families = slices.Clone(in.Details.Families)
|
||||
out.Messages = cloneMessages(in.Messages)
|
||||
out.Capabilities = slices.Clone(in.Capabilities)
|
||||
out.ModelInfo = cloneAnyMap(in.ModelInfo)
|
||||
out.ProjectorInfo = cloneAnyMap(in.ProjectorInfo)
|
||||
out.Tensors = cloneTensors(in.Tensors)
|
||||
return &out
|
||||
}
|
||||
|
||||
func cloneMessages(in []api.Message) []api.Message {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := make([]api.Message, len(in))
|
||||
for i, msg := range in {
|
||||
out[i] = msg
|
||||
if msg.Images != nil {
|
||||
out[i].Images = make([]api.ImageData, len(msg.Images))
|
||||
for j, image := range msg.Images {
|
||||
out[i].Images[j] = slices.Clone(image)
|
||||
}
|
||||
}
|
||||
out[i].ToolCalls = slices.Clone(msg.ToolCalls)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func cloneTensors(in []api.Tensor) []api.Tensor {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := make([]api.Tensor, len(in))
|
||||
for i, tensor := range in {
|
||||
out[i] = tensor
|
||||
out[i].Shape = slices.Clone(tensor.Shape)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func cloneAnyMap(in map[string]any) map[string]any {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := make(map[string]any, len(in))
|
||||
for k, v := range in {
|
||||
out[k] = cloneAny(v)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func cloneAny(v any) any {
|
||||
switch v := v.(type) {
|
||||
case map[string]any:
|
||||
return cloneAnyMap(v)
|
||||
case []any:
|
||||
out := make([]any, len(v))
|
||||
for i, item := range v {
|
||||
out[i] = cloneAny(item)
|
||||
}
|
||||
return out
|
||||
case []string:
|
||||
return slices.Clone(v)
|
||||
case []bool:
|
||||
return slices.Clone(v)
|
||||
case []int:
|
||||
return slices.Clone(v)
|
||||
case []int8:
|
||||
return slices.Clone(v)
|
||||
case []int16:
|
||||
return slices.Clone(v)
|
||||
case []int32:
|
||||
return slices.Clone(v)
|
||||
case []int64:
|
||||
return slices.Clone(v)
|
||||
case []uint:
|
||||
return slices.Clone(v)
|
||||
case []uint8:
|
||||
return slices.Clone(v)
|
||||
case []uint16:
|
||||
return slices.Clone(v)
|
||||
case []uint32:
|
||||
return slices.Clone(v)
|
||||
case []uint64:
|
||||
return slices.Clone(v)
|
||||
case []float32:
|
||||
return slices.Clone(v)
|
||||
case []float64:
|
||||
return slices.Clone(v)
|
||||
default:
|
||||
return v
|
||||
}
|
||||
}
|
||||
501
server/model_show_cache_test.go
Normal file
501
server/model_show_cache_test.go
Normal file
@@ -0,0 +1,501 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
|
||||
"github.com/ollama/ollama/api"
|
||||
"github.com/ollama/ollama/envconfig"
|
||||
internalcloud "github.com/ollama/ollama/internal/cloud"
|
||||
"github.com/ollama/ollama/manifest"
|
||||
modelpkg "github.com/ollama/ollama/types/model"
|
||||
)
|
||||
|
||||
func TestModelShowCacheLocalHitUsesManifestDigest(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
setTestHome(t, t.TempDir())
|
||||
createShowCacheModel(t, "show-cache-local", map[string]any{"test.context_length": uint32(1024)})
|
||||
|
||||
cache := newModelShowCache()
|
||||
calls := 0
|
||||
cache.getModelInfo = func(req api.ShowRequest) (*api.ShowResponse, error) {
|
||||
calls++
|
||||
return showCacheTestResponse(calls, req.Verbose), nil
|
||||
}
|
||||
|
||||
first, err := cache.GetLocal(api.ShowRequest{Model: "show-cache-local"})
|
||||
if err != nil {
|
||||
t.Fatalf("first GetLocal failed: %v", err)
|
||||
}
|
||||
second, err := cache.GetLocal(api.ShowRequest{Model: "show-cache-local"})
|
||||
if err != nil {
|
||||
t.Fatalf("second GetLocal failed: %v", err)
|
||||
}
|
||||
|
||||
if calls != 1 {
|
||||
t.Fatalf("getModelInfo calls = %d, want 1", calls)
|
||||
}
|
||||
if first.ModelInfo["call"] != 1 || second.ModelInfo["call"] != 1 {
|
||||
t.Fatalf("cached call markers = %v / %v, want both 1", first.ModelInfo["call"], second.ModelInfo["call"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestModelShowCacheLocalManifestDigestChangeRefreshes(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
setTestHome(t, t.TempDir())
|
||||
createShowCacheModel(t, "show-cache-refresh", map[string]any{"test.context_length": uint32(1024)})
|
||||
|
||||
cache := newModelShowCache()
|
||||
calls := 0
|
||||
cache.getModelInfo = func(req api.ShowRequest) (*api.ShowResponse, error) {
|
||||
calls++
|
||||
return showCacheTestResponse(calls, req.Verbose), nil
|
||||
}
|
||||
|
||||
if _, err := cache.GetLocal(api.ShowRequest{Model: "show-cache-refresh"}); err != nil {
|
||||
t.Fatalf("first GetLocal failed: %v", err)
|
||||
}
|
||||
changeShowCacheManifest(t, "show-cache-refresh")
|
||||
refreshed, err := cache.GetLocal(api.ShowRequest{Model: "show-cache-refresh"})
|
||||
if err != nil {
|
||||
t.Fatalf("refreshed GetLocal failed: %v", err)
|
||||
}
|
||||
|
||||
if calls != 2 {
|
||||
t.Fatalf("getModelInfo calls = %d, want 2", calls)
|
||||
}
|
||||
if refreshed.ModelInfo["call"] != 2 {
|
||||
t.Fatalf("refreshed call marker = %v, want 2", refreshed.ModelInfo["call"])
|
||||
}
|
||||
if len(cache.local) != 1 {
|
||||
t.Fatalf("local cache entries = %d, want 1", len(cache.local))
|
||||
}
|
||||
}
|
||||
|
||||
func TestModelShowCacheLocalVerboseVariantsAreSeparate(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
setTestHome(t, t.TempDir())
|
||||
createShowCacheModel(t, "show-cache-verbose", map[string]any{"test.context_length": uint32(1024)})
|
||||
|
||||
cache := newModelShowCache()
|
||||
calls := 0
|
||||
cache.getModelInfo = func(req api.ShowRequest) (*api.ShowResponse, error) {
|
||||
calls++
|
||||
return showCacheTestResponse(calls, req.Verbose), nil
|
||||
}
|
||||
|
||||
plain, err := cache.GetLocal(api.ShowRequest{Model: "show-cache-verbose"})
|
||||
if err != nil {
|
||||
t.Fatalf("plain GetLocal failed: %v", err)
|
||||
}
|
||||
verbose, err := cache.GetLocal(api.ShowRequest{Model: "show-cache-verbose", Verbose: true})
|
||||
if err != nil {
|
||||
t.Fatalf("verbose GetLocal failed: %v", err)
|
||||
}
|
||||
plainAgain, err := cache.GetLocal(api.ShowRequest{Model: "show-cache-verbose"})
|
||||
if err != nil {
|
||||
t.Fatalf("plain repeat GetLocal failed: %v", err)
|
||||
}
|
||||
|
||||
if calls != 2 {
|
||||
t.Fatalf("getModelInfo calls = %d, want 2", calls)
|
||||
}
|
||||
if plain.ModelInfo["verbose"] != false || verbose.ModelInfo["verbose"] != true || plainAgain.ModelInfo["call"] != 1 {
|
||||
t.Fatalf("unexpected verbose cache markers: plain=%v verbose=%v plainAgainCall=%v", plain.ModelInfo, verbose.ModelInfo, plainAgain.ModelInfo["call"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestModelShowCacheLocalHydrationSkipsUnchangedInMemory(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
setTestHome(t, t.TempDir())
|
||||
createShowCacheModel(t, "show-cache-hydrate", map[string]any{"test.context_length": uint32(1024)})
|
||||
|
||||
cache := newModelShowCache()
|
||||
calls := 0
|
||||
cache.getModelInfo = func(req api.ShowRequest) (*api.ShowResponse, error) {
|
||||
calls++
|
||||
return showCacheTestResponse(calls, req.Verbose), nil
|
||||
}
|
||||
|
||||
if err := cache.hydrateLocal(context.Background()); err != nil {
|
||||
t.Fatalf("first hydrateLocal failed: %v", err)
|
||||
}
|
||||
if err := cache.hydrateLocal(context.Background()); err != nil {
|
||||
t.Fatalf("second hydrateLocal failed: %v", err)
|
||||
}
|
||||
resp, err := cache.GetLocal(api.ShowRequest{Model: "show-cache-hydrate"})
|
||||
if err != nil {
|
||||
t.Fatalf("GetLocal after hydration failed: %v", err)
|
||||
}
|
||||
|
||||
if calls != 1 {
|
||||
t.Fatalf("getModelInfo calls after unchanged in-memory hydration = %d, want 1", calls)
|
||||
}
|
||||
if resp.ModelInfo["call"] != 1 {
|
||||
t.Fatalf("hydrated call marker = %v, want 1", resp.ModelInfo["call"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestModelShowCacheBypassesSystemAndOptionsOverlays(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
setTestHome(t, t.TempDir())
|
||||
createShowCacheModel(t, "show-cache-overlay", map[string]any{"test.context_length": uint32(1024)})
|
||||
|
||||
cache := newModelShowCache()
|
||||
key, digest, err := modelShowLocalKeyForRequest(api.ShowRequest{Model: "show-cache-overlay"})
|
||||
if err != nil {
|
||||
t.Fatalf("local key failed: %v", err)
|
||||
}
|
||||
cache.setLocal(key, digest, &api.ShowResponse{System: "cached", ModelInfo: map[string]any{}})
|
||||
|
||||
s := Server{modelCaches: &modelCaches{show: cache}}
|
||||
w := createRequest(t, s.ShowHandler, api.ShowRequest{
|
||||
Model: "show-cache-overlay",
|
||||
System: "overlay-system",
|
||||
})
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("status = %d, want 200: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp api.ShowResponse
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("decode response failed: %v", err)
|
||||
}
|
||||
if resp.System != "overlay-system" {
|
||||
t.Fatalf("system = %q, want overlay-system", resp.System)
|
||||
}
|
||||
|
||||
w = createRequest(t, s.ShowHandler, api.ShowRequest{
|
||||
Model: "show-cache-overlay",
|
||||
Options: map[string]any{"num_ctx": float64(8192)},
|
||||
})
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("options overlay status = %d, want 200: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("decode options response failed: %v", err)
|
||||
}
|
||||
if resp.System == "cached" {
|
||||
t.Fatalf("options overlay unexpectedly returned cached response")
|
||||
}
|
||||
}
|
||||
|
||||
func TestModelShowCacheLocalAndCloudSameBaseDoNotCollide(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
setTestHome(t, t.TempDir())
|
||||
createShowCacheModel(t, "show-cache-dual", map[string]any{"test.context_length": uint32(1024)})
|
||||
|
||||
cache := newModelShowCache()
|
||||
cache.getModelInfo = func(req api.ShowRequest) (*api.ShowResponse, error) {
|
||||
return &api.ShowResponse{
|
||||
Details: api.ModelDetails{Format: "local"},
|
||||
ModelInfo: map[string]any{},
|
||||
}, nil
|
||||
}
|
||||
cloudKey := modelShowCloudKeyForModel("show-cache-dual:cloud", false)
|
||||
cache.setCloud(cloudKey, &api.ShowResponse{
|
||||
Details: api.ModelDetails{Format: "cloud"},
|
||||
ModelInfo: map[string]any{},
|
||||
})
|
||||
cache.mu.Lock()
|
||||
cache.cloudNextReadRefreshAfter[cloudKey] = time.Now().Add(time.Hour)
|
||||
cache.mu.Unlock()
|
||||
|
||||
s := Server{modelCaches: &modelCaches{show: cache}}
|
||||
|
||||
w := createRequest(t, s.ShowHandler, api.ShowRequest{Model: "show-cache-dual:cloud"})
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("cloud status = %d, want 200: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var cloudResp api.ShowResponse
|
||||
if err := json.NewDecoder(w.Body).Decode(&cloudResp); err != nil {
|
||||
t.Fatalf("decode cloud response failed: %v", err)
|
||||
}
|
||||
if cloudResp.Details.Format != "cloud" {
|
||||
t.Fatalf("cloud format = %q, want cloud", cloudResp.Details.Format)
|
||||
}
|
||||
|
||||
w = createRequest(t, s.ShowHandler, api.ShowRequest{Model: "show-cache-dual"})
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("local status = %d, want 200: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var localResp api.ShowResponse
|
||||
if err := json.NewDecoder(w.Body).Decode(&localResp); err != nil {
|
||||
t.Fatalf("decode local response failed: %v", err)
|
||||
}
|
||||
if localResp.Details.Format != "local" {
|
||||
t.Fatalf("local format = %q, want local", localResp.Details.Format)
|
||||
}
|
||||
}
|
||||
|
||||
func TestModelShowCacheCloudWarmHitReturnsStaleAndRefreshes(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
setTestHome(t, t.TempDir())
|
||||
|
||||
refreshDone := make(chan struct{})
|
||||
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/api/show" {
|
||||
t.Fatalf("unexpected upstream path %q", r.URL.Path)
|
||||
}
|
||||
defer close(refreshDone)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(`{"details":{"format":"updated"},"model_info":{"source":"fresh"}}`))
|
||||
}))
|
||||
defer upstream.Close()
|
||||
withCloudProxyBaseURL(t, upstream.URL)
|
||||
|
||||
cache := newModelShowCache()
|
||||
cache.client = upstream.Client()
|
||||
cache.setCloud(modelShowCloudKeyForModel("kimi-k2.5:cloud", false), &api.ShowResponse{
|
||||
Details: api.ModelDetails{Format: "cached"},
|
||||
ModelInfo: map[string]any{"source": "stale"},
|
||||
})
|
||||
|
||||
s := Server{modelCaches: &modelCaches{show: cache}}
|
||||
w := createRequest(t, s.ShowHandler, api.ShowRequest{Model: "kimi-k2.5:cloud"})
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("status = %d, want 200: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp api.ShowResponse
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("decode response failed: %v", err)
|
||||
}
|
||||
if resp.Details.Format != "cached" {
|
||||
t.Fatalf("format = %q, want cached", resp.Details.Format)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-refreshDone:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timed out waiting for cloud refresh")
|
||||
}
|
||||
waitForCondition(t, 2*time.Second, func() bool {
|
||||
resp, ok := cache.getCloud(modelShowCloudKeyForModel("kimi-k2.5:cloud", false))
|
||||
return ok && resp.Details.Format == "updated"
|
||||
})
|
||||
}
|
||||
|
||||
func TestModelShowCacheCloudColdMissFallsBackToProxy(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
setTestHome(t, t.TempDir())
|
||||
|
||||
var capturedPath, capturedBody string
|
||||
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
capturedPath = r.URL.Path
|
||||
body, _ := io.ReadAll(r.Body)
|
||||
capturedBody = string(body)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(`{"details":{"format":"cold"},"model_info":{}}`))
|
||||
}))
|
||||
defer upstream.Close()
|
||||
withCloudProxyBaseURL(t, upstream.URL)
|
||||
|
||||
s := &Server{modelCaches: &modelCaches{show: newModelShowCache()}}
|
||||
router, err := s.GenerateRoutes(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("GenerateRoutes failed: %v", err)
|
||||
}
|
||||
local := httptest.NewServer(router)
|
||||
defer local.Close()
|
||||
|
||||
req, err := http.NewRequestWithContext(t.Context(), http.MethodPost, local.URL+"/api/show", bytes.NewBufferString(`{"model":"kimi-k2.5:cloud"}`))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
resp, err := local.Client().Do(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
t.Fatalf("status = %d, want 200: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
if capturedPath != "/api/show" {
|
||||
t.Fatalf("upstream path = %q, want /api/show", capturedPath)
|
||||
}
|
||||
if !strings.Contains(capturedBody, `"model":"kimi-k2.5"`) {
|
||||
t.Fatalf("expected normalized model in upstream body, got %q", capturedBody)
|
||||
}
|
||||
}
|
||||
|
||||
func TestModelShowCacheCloudHydrationUsesTagsAndShow(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
setTestHome(t, t.TempDir())
|
||||
|
||||
var mu sync.Mutex
|
||||
var showModels []string
|
||||
var tagsCalled bool
|
||||
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
switch r.URL.Path {
|
||||
case "/api/tags":
|
||||
if r.Method != http.MethodGet {
|
||||
t.Fatalf("tags method = %s, want GET", r.Method)
|
||||
}
|
||||
mu.Lock()
|
||||
tagsCalled = true
|
||||
mu.Unlock()
|
||||
_, _ = w.Write([]byte(`{"models":[{"name":"alpha:cloud"},{"model":"beta"}]}`))
|
||||
case "/api/show":
|
||||
var req api.ShowRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
t.Fatalf("decode show request failed: %v", err)
|
||||
}
|
||||
mu.Lock()
|
||||
showModels = append(showModels, req.Model)
|
||||
mu.Unlock()
|
||||
_ = json.NewEncoder(w).Encode(api.ShowResponse{
|
||||
Details: api.ModelDetails{Format: req.Model},
|
||||
ModelInfo: map[string]any{"model": req.Model},
|
||||
})
|
||||
default:
|
||||
t.Fatalf("unexpected upstream path %q", r.URL.Path)
|
||||
}
|
||||
}))
|
||||
defer upstream.Close()
|
||||
withCloudProxyBaseURL(t, upstream.URL)
|
||||
|
||||
cache := newModelShowCache()
|
||||
cache.client = upstream.Client()
|
||||
if err := cache.hydrateCloud(context.Background()); err != nil {
|
||||
t.Fatalf("hydrateCloud failed: %v", err)
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
gotTagsCalled := tagsCalled
|
||||
gotShowModels := slices.Clone(showModels)
|
||||
mu.Unlock()
|
||||
slices.Sort(gotShowModels)
|
||||
|
||||
if !gotTagsCalled {
|
||||
t.Fatal("expected /api/tags to be called")
|
||||
}
|
||||
if !slices.Equal(gotShowModels, []string{"alpha", "beta"}) {
|
||||
t.Fatalf("show models = %v, want [alpha beta]", gotShowModels)
|
||||
}
|
||||
for _, modelName := range gotShowModels {
|
||||
resp, ok := cache.getCloud(modelShowCloudKeyForModel(modelName, false))
|
||||
if !ok {
|
||||
t.Fatalf("missing cached cloud show response for %s", modelName)
|
||||
}
|
||||
if resp.Details.Format != modelName {
|
||||
t.Fatalf("cached format for %s = %q", modelName, resp.Details.Format)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestModelShowCacheCloudKeyNormalizesSourceTags(t *testing.T) {
|
||||
tests := map[string]string{
|
||||
" kimi-k2.5:cloud ": "kimi-k2.5",
|
||||
"gpt-oss:20b-cloud": "gpt-oss:20b",
|
||||
"qwen3": "qwen3",
|
||||
}
|
||||
|
||||
for input, want := range tests {
|
||||
if got := modelShowCloudKeyForModel(input, false).Model; got != want {
|
||||
t.Fatalf("cloud key model for %q = %q, want %q", input, got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestModelShowCacheCloudDisabledDoesNotServeStale(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
setTestHome(t, t.TempDir())
|
||||
|
||||
t.Cleanup(envconfig.ReloadServerConfig)
|
||||
t.Setenv("OLLAMA_NO_CLOUD", "1")
|
||||
envconfig.ReloadServerConfig()
|
||||
|
||||
cache := newModelShowCache()
|
||||
cache.setCloud(modelShowCloudKeyForModel("kimi-k2.5:cloud", false), &api.ShowResponse{
|
||||
Details: api.ModelDetails{Format: "cached"},
|
||||
ModelInfo: map[string]any{},
|
||||
})
|
||||
|
||||
if err := cache.hydrateCloud(context.Background()); !errors.Is(err, errModelShowNoCloud) {
|
||||
t.Fatalf("hydrateCloud error = %v, want %v", err, errModelShowNoCloud)
|
||||
}
|
||||
|
||||
s := Server{modelCaches: &modelCaches{show: cache}}
|
||||
w := createRequest(t, s.ShowHandler, api.ShowRequest{Model: "kimi-k2.5:cloud"})
|
||||
if w.Code != http.StatusForbidden {
|
||||
t.Fatalf("status = %d, want 403: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), internalcloud.DisabledError(cloudErrRemoteModelDetailsUnavailable)) {
|
||||
t.Fatalf("unexpected disabled response: %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func createShowCacheModel(t *testing.T, name string, kv map[string]any) {
|
||||
t.Helper()
|
||||
_, digest := createBinFile(t, kv, nil)
|
||||
var s Server
|
||||
w := createRequest(t, s.CreateHandler, api.CreateRequest{
|
||||
Model: name,
|
||||
Files: map[string]string{"model.gguf": digest},
|
||||
Stream: &stream,
|
||||
})
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("create model status = %d, want 200: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func changeShowCacheManifest(t *testing.T, name string) {
|
||||
t.Helper()
|
||||
n, err := getExistingName(modelpkg.ParseName(name))
|
||||
if err != nil {
|
||||
t.Fatalf("get existing name: %v", err)
|
||||
}
|
||||
mf, err := manifest.ParseNamedManifest(n)
|
||||
if err != nil {
|
||||
t.Fatalf("parse manifest: %v", err)
|
||||
}
|
||||
layer, err := manifest.NewLayer(strings.NewReader("changed"), "application/vnd.ollama.image.system")
|
||||
if err != nil {
|
||||
t.Fatalf("new layer: %v", err)
|
||||
}
|
||||
layers := append([]manifest.Layer(nil), mf.Layers...)
|
||||
layers = append(layers, layer)
|
||||
if err := manifest.WriteManifest(n, mf.Config, layers); err != nil {
|
||||
t.Fatalf("write manifest: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func showCacheTestResponse(call int, verbose bool) *api.ShowResponse {
|
||||
return &api.ShowResponse{
|
||||
Details: api.ModelDetails{
|
||||
Format: "gguf",
|
||||
Family: "test",
|
||||
},
|
||||
Capabilities: []modelpkg.Capability{modelpkg.CapabilityCompletion},
|
||||
ModelInfo: map[string]any{
|
||||
"call": call,
|
||||
"verbose": verbose,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func withCloudProxyBaseURL(t *testing.T, url string) {
|
||||
t.Helper()
|
||||
original := cloudProxyBaseURL
|
||||
cloudProxyBaseURL = url
|
||||
t.Cleanup(func() {
|
||||
cloudProxyBaseURL = original
|
||||
})
|
||||
}
|
||||
@@ -98,11 +98,11 @@ var useClient2 = experimentEnabled("client2")
|
||||
var mode string = gin.DebugMode
|
||||
|
||||
type Server struct {
|
||||
addr net.Addr
|
||||
sched *Scheduler
|
||||
defaultNumCtx int
|
||||
requestLogger *inferenceRequestLogger
|
||||
modelRecommendations *modelRecommendationsCache
|
||||
addr net.Addr
|
||||
sched *Scheduler
|
||||
defaultNumCtx int
|
||||
requestLogger *inferenceRequestLogger
|
||||
modelCaches *modelCaches
|
||||
}
|
||||
|
||||
func init() {
|
||||
@@ -1143,13 +1143,33 @@ func (s *Server) ShowHandler(c *gin.Context) {
|
||||
|
||||
if modelRef.Source == modelSourceCloud {
|
||||
req.Model = modelRef.Base
|
||||
if modelShowCacheable(req) && s.modelCaches != nil && s.modelCaches.show != nil {
|
||||
if disabled, _ := internalcloud.Status(); disabled {
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": internalcloud.DisabledError(cloudErrRemoteModelDetailsUnavailable)})
|
||||
return
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
if c.Request != nil {
|
||||
ctx = c.Request.Context()
|
||||
}
|
||||
if resp, ok := s.modelCaches.show.GetCloudSWR(ctx, req); ok {
|
||||
c.JSON(http.StatusOK, resp)
|
||||
return
|
||||
}
|
||||
}
|
||||
proxyCloudJSONRequest(c, req, cloudErrRemoteModelDetailsUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
req.Model = modelRef.Base
|
||||
|
||||
resp, err := GetModelInfo(req)
|
||||
var resp *api.ShowResponse
|
||||
if modelShowCacheable(req) && s.modelCaches != nil && s.modelCaches.show != nil {
|
||||
resp, err = s.modelCaches.show.GetLocal(req)
|
||||
} else {
|
||||
resp, err = GetModelInfo(req)
|
||||
}
|
||||
if err != nil {
|
||||
var statusErr api.StatusError
|
||||
switch {
|
||||
@@ -1762,12 +1782,12 @@ func (s *Server) GenerateRoutes(rc *ollama.Registry) (http.Handler, error) {
|
||||
func (s *Server) ModelRecommendationsExperimentalHandler(c *gin.Context) {
|
||||
recs := defaultModelRecommendations
|
||||
source := "default"
|
||||
if s.modelRecommendations != nil {
|
||||
if s.modelCaches != nil && s.modelCaches.recommendations != nil {
|
||||
ctx := context.Background()
|
||||
if c.Request != nil {
|
||||
ctx = c.Request.Context()
|
||||
}
|
||||
recs = s.modelRecommendations.GetSWR(ctx)
|
||||
recs = s.modelCaches.recommendations.GetSWR(ctx)
|
||||
source = "cache"
|
||||
}
|
||||
|
||||
@@ -1811,12 +1831,9 @@ func Serve(ln net.Listener) error {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(parthsareen): If we add more runtime caches, prefer introducing a
|
||||
// small cache manager owned by Server (for shared start/stop/health wiring)
|
||||
// instead of adding one top-level field per cache here in Serve.
|
||||
s := &Server{
|
||||
addr: ln.Addr(),
|
||||
modelRecommendations: newModelRecommendationsCache(),
|
||||
addr: ln.Addr(),
|
||||
modelCaches: newModelCaches(),
|
||||
}
|
||||
if err := s.initRequestLogging(); err != nil {
|
||||
return err
|
||||
@@ -1842,7 +1859,7 @@ func Serve(ln net.Listener) error {
|
||||
schedCtx, schedDone := context.WithCancel(ctx)
|
||||
sched := InitScheduler(schedCtx)
|
||||
s.sched = sched
|
||||
s.modelRecommendations.Start(ctx)
|
||||
s.modelCaches.Start(ctx)
|
||||
|
||||
slog.Info(fmt.Sprintf("Listening on %s (version %s)", ln.Addr(), version.Version))
|
||||
srvr := &http.Server{
|
||||
|
||||
Reference in New Issue
Block a user