coder

from transformers import AutoTokenizer

class Coder:
    def __init__(self, model, model_path):
        self.model = model
        self.model_path = model_path
        self.tokenizer = AutoTokenizer.from_pretrained(model_path)
        if self.model == 'ds_qwen2.5':
            self.template = {}
            self.template['system_begin'] = '<|begin▁of▁sentence|>'
            self.template['system_end'] = ''
            self.template['user_begin'] = '<|User|>'
            self.template['user_end'] = ''
            self.template['assistant_begin'] = '<|Assistant|>'
            self.template['assistant_end'] = '<|end▁of▁sentence|>'
            self.template['think_begin'] = '<think>'
            self.template['think_end'] = '</think>'
            self.template['mask'] = -100
    def check_history(self, history):
        if len(history) < 2:
            return 'not enough history'
        item = history[0]
        if item['role'] != 'system':
            return 'not start with system'
        role0 = 'assistant'
        for item in history[1:]:
            role = item['role']
            if role not in ['user', 'assistant']:
                return 'role error'
            if role == role0:
                return 'not user and assistant alternate'
            role0 = role
        if role == 'user':
            return 'end with user'
        return ''
    def decode(self, input_ids):
        text = self.tokenizer.decode(input_ids)
        return text
    def encode(self, conversations, last_turn=False, max_tokens=8192):
        flag = self.check_history(conversations)
        if flag != '':
            return {'flag': flag}
        input_ids = []
        attention_mask = []
        labels = []
        input_text = ''
        labels_text = ''
        images = None
        videos = None
        audios = None
        last = True
        flag = ''
        for item in conversations[::-1]:
            role = item['role']
            content = item['content']
            think = ''
            if self.template['think_begin'] in content and self.template['think_end'] in content:
                think = content.split(self.template['think_begin'])[1].split(self.template['think_end'])[0]
                content = content.split(self.template['think_begin'])[0] + self.template['think_begin'] + think + self.template['think_end']
            think = item.get('think', think)
            if role == 'system':
                content = self.template['system_begin'] + content + self.template['system_end']
            elif role == 'user':
                content = self.template['user_begin'] + content + self.template['user_end'] + self.template['assistant_begin']
            elif role == 'assistant':
                content = self.template['think_begin'] + think + self.template['think_end'] + content + self.template['assistant_end']
            ids = self.tokenizer.encode(content, add_special_tokens=False)
            if len(input_ids) + len(ids) > max_tokens:
                print('warning: max_tokens exceeded')
                flag = 'max_tokens exceeded'
                break
            input_ids = ids + input_ids
            input_text = content + input_text
            attention_mask = [1] * len(ids) + attention_mask
            if role == 'assistant':
                if last_turn:
                    if last:
                        labels = ids + labels
                        labels_text = content + labels_text
                        last = False
                    else:
                        labels = [self.template['mask']] * len(ids) + labels
                else:
                    labels = ids + labels
                    labels_text = content + labels_text
            else:
                labels = [self.template['mask']] * len(ids) + labels

        d = {'input_ids': input_ids, 'attention_mask': attention_mask, 'labels': labels, 'images': images, 'videos': videos, 'audios': audios, 'input_text': input_text, 'labels_text': labels_text, 'flag': flag}    
        return d

if __name__ == '__main__':
    coder = Coder('ds_qwen2.5', '/home/liulongxiang5/models/deepseek-ai/DeepSeek-R1-Distill-Qwen-32B')
    history = [
        {'role': 'system', 'content': '你是一个AI助手,请根据用户的问题给出回答。'},
        {'role': 'user', 'content': '你好,我是小明,很高兴认识你。'},
        {'role': 'assistant', 'content': '你好,小明,很高兴认识你。', 'think': '1|A'},
        {'role': 'user', 'content': '你是哪里人?'},
        {'role': 'assistant', 'content': '我是中国人。'},
        {'role': 'user', 'content': '你叫什么名字?'},
        {'role': 'assistant', 'content': '我叫小明。'},
        {'role': 'user', 'content': '你今年几岁?'},
        {'role': 'assistant', 'content': '我今年20岁。'},
        {'role': 'user', 'content': '你住在哪里?'},
        {'role': 'assistant', 'content': '我住在北京。'},
        {'role': 'user', 'content': '再见。'}
    ]
    d = coder.encode(history, last_turn=True)
    print(d)
    input_ids = d['input_ids']
    print(input_ids)
    text = coder.decode(input_ids)
    print(text)