This commit is contained in:
2025-12-08 20:46:24 +08:00
parent 2cc93e0b48
commit 0400d2020b
2 changed files with 182 additions and 3 deletions

View File

@@ -3,6 +3,7 @@
//
#import "KBStreamFetcher.h"
#import "KBLocalizationManager.h"
@interface KBStreamFetcher () <NSURLSessionDataDelegate>
@property (nonatomic, strong) NSURLSession *session;
@@ -18,6 +19,7 @@
@property (nonatomic, strong) NSMutableArray<NSString *> *pendingQueue; //
@property (nonatomic, strong) NSTimer *flushTimer; //
@property (nonatomic, strong, nullable) NSError *finishError; //
@property (nonatomic, copy, nullable) NSString *pendingSplitTokenPrefix; // `<SPLIT>`
// Metrics
@property (nonatomic, assign) CFAbsoluteTime tStart; // start()
@@ -45,6 +47,8 @@ static NSUInteger kb_validUTF8PrefixLen(NSData *data) {
return (remain >= expected) ? n : (NSUInteger)i;
}
static NSString * const kKBStreamSplitToken = @"<SPLIT>";
@implementation KBStreamFetcher
+ (instancetype)fetcherWithURL:(NSURL *)url {
@@ -68,6 +72,7 @@ static NSUInteger kb_validUTF8PrefixLen(NSData *data) {
_flushInterval = 0.1;
_splitLargeDeltasOnWhitespace = YES;
_loggingEnabled = YES;
_pendingSplitTokenPrefix = nil;
}
return self;
}
@@ -107,6 +112,7 @@ static NSUInteger kb_validUTF8PrefixLen(NSData *data) {
[self.pendingQueue removeAllObjects];
[self.flushTimer invalidate]; self.flushTimer = nil;
self.finishError = nil;
self.pendingSplitTokenPrefix = nil;
self.tStart = CFAbsoluteTimeGetCurrent();
self.tFirstByte = 0;
@@ -138,6 +144,7 @@ static NSUInteger kb_validUTF8PrefixLen(NSData *data) {
[self.pendingQueue removeAllObjects];
[self.flushTimer invalidate]; self.flushTimer = nil;
self.finishError = nil;
self.pendingSplitTokenPrefix = nil;
}
#pragma mark - NSURLSessionDataDelegate
@@ -216,7 +223,17 @@ static NSUInteger kb_validUTF8PrefixLen(NSData *data) {
[payload appendString:v ?: @""];
}
}
if (payload.length > 0) { [self enqueueChunk:payload]; }
if (payload.length > 0) {
if (self.loggingEnabled) {
NSLog(@"[KBStream] SSE raw payload: %@", payload);
}
NSString *llmText = nil;
if ([self processLLMChunkPayload:payload output:&llmText]) {
if (llmText.length > 0) { [self enqueueChunk:llmText]; }
} else {
[self enqueueChunk:payload];
}
}
}
}
return;
@@ -260,6 +277,9 @@ static NSUInteger kb_validUTF8PrefixLen(NSData *data) {
}
}
if (payload.length > 0) {
if (self.loggingEnabled) {
NSLog(@"[KBStream] SSE raw payload: %@", payload);
}
NSString *delta = nil;
if ((NSInteger)payload.length >= self.deliveredCharCount) {
delta = [payload substringFromIndex:self.deliveredCharCount];
@@ -267,9 +287,21 @@ static NSUInteger kb_validUTF8PrefixLen(NSData *data) {
delta = payload;
}
self.deliveredCharCount = payload.length;
if (delta.length > 0) { [self emitChunk:delta]; }
if (delta.length > 0) {
NSString *llmText = nil;
if ([self processLLMChunkPayload:delta output:&llmText]) {
if (llmText.length > 0) { [self emitChunk:llmText]; }
} else {
[self emitChunk:delta];
}
}
}
}
if (self.pendingSplitTokenPrefix.length > 0) {
NSString *carry = self.pendingSplitTokenPrefix;
self.pendingSplitTokenPrefix = nil;
if (carry.length > 0) { [self enqueueChunk:carry]; }
}
self.tFinish = CFAbsoluteTimeGetCurrent();
if (self.loggingEnabled) {
@@ -348,6 +380,101 @@ static NSUInteger kb_validUTF8PrefixLen(NSData *data) {
self.lastChunkEndedWithTab = (lastc == '\t');
}
- (BOOL)processLLMChunkPayload:(NSString *)payload output:(NSString * _Nullable __autoreleasing *)output {
if (output) { *output = nil; }
if (payload.length == 0) { return NO; }
NSData *jsonData = [payload dataUsingEncoding:NSUTF8StringEncoding];
if (!jsonData) { return NO; }
NSError *jsonError = nil;
id obj = [NSJSONSerialization JSONObjectWithData:jsonData options:0 error:&jsonError];
if (jsonError || ![obj isKindOfClass:[NSDictionary class]]) { return NO; }
NSString *type = ((NSDictionary *)obj)[@"type"];
if (![type isKindOfClass:[NSString class]]) { return NO; }
if ([type isEqualToString:@"llm_chunk"]) {
id dataValue = ((NSDictionary *)obj)[@"data"];
if (![dataValue isKindOfClass:[NSString class]]) {
if (output) { *output = @""; }
return YES;
}
NSString *normalized = [self normalizedLLMDataString:(NSString *)dataValue];
if (output) { *output = normalized; }
return YES;
}
if ([type isEqualToString:@"search_result"]) {
NSString *searchText = [self normalizedSearchResultString:((NSDictionary *)obj)[@"data"]];
if (output) { *output = searchText ?: @""; }
return YES;
}
if ([type isEqualToString:@"done"]) {
if (output) { *output = @""; }
return YES;
}
return NO;
}
- (NSString *)normalizedLLMDataString:(NSString *)dataString {
NSString *combined = dataString ?: @"";
if (self.pendingSplitTokenPrefix.length > 0) {
combined = [self.pendingSplitTokenPrefix stringByAppendingString:combined];
self.pendingSplitTokenPrefix = nil;
}
if (combined.length == 0) { return @""; }
NSString *result = [combined stringByReplacingOccurrencesOfString:kKBStreamSplitToken withString:@"\t"];
NSString *suffix = [self pendingSplitPrefixSuffixForString:result];
if (suffix.length > 0) {
self.pendingSplitTokenPrefix = suffix;
result = [result substringToIndex:result.length - suffix.length];
}
return result;
}
- (NSString *)normalizedSearchResultString:(id)dataValue {
if (![dataValue isKindOfClass:[NSArray class]]) { return @""; }
NSArray *list = (NSArray *)dataValue;
NSMutableArray<NSString *> *segments = [NSMutableArray array];
for (NSUInteger i = 0; i < list.count; i++) {
id item = list[i];
NSString *payload = nil;
if ([item isKindOfClass:[NSDictionary class]]) {
id val = ((NSDictionary *)item)[@"payload"];
if ([val isKindOfClass:[NSString class]]) {
payload = (NSString *)val;
}
} else if ([item isKindOfClass:[NSString class]]) {
payload = (NSString *)item;
}
payload = [payload stringByTrimmingCharactersInSet:[NSCharacterSet whitespaceAndNewlineCharacterSet]];
if (payload.length == 0) { continue; }
NSString *line = [NSString stringWithFormat:@"%lu. %@", (unsigned long)(segments.count + 1), payload];
[segments addObject:line];
}
if (segments.count == 0) { return @""; }
NSString *title = KBLocalized(@"Search result");
NSMutableString *text = [NSMutableString string];
[text appendString:@"\t"];
[text appendFormat:@"%@:", title.length > 0 ? title : @"Search result"];
for (NSString *line in segments) {
[text appendString:@"\t"];
[text appendString:line];
}
return text;
}
- (NSString *)pendingSplitPrefixSuffixForString:(NSString *)text {
if (text.length == 0) { return @""; }
NSUInteger tokenLen = kKBStreamSplitToken.length;
if (tokenLen <= 1) { return @""; }
NSUInteger maxLen = MIN(tokenLen - 1, text.length);
for (NSUInteger len = maxLen; len > 0; len--) {
NSString *suffix = [text substringFromIndex:text.length - len];
NSString *prefix = [kKBStreamSplitToken substringToIndex:len];
if ([suffix isEqualToString:prefix]) {
return suffix;
}
}
return @"";
}
#pragma mark - Queue/Flush
- (void)enqueueChunk:(NSString *)s {