Меню

Главная
Случайная статья
Настройки
Участник:AlphaRhoBot/abot.py
Материал из https://ru.wikipedia.org

Бот для архивации страниц обсуждения и форумов. Совместим с ClaymoreBot: использует те же страницы конфигурации и параметры. Если в конфиге есть параметр неизвестный боту (см. #Неподдерживаемые параметры), такие страницы пропускаются.

Отличия от ClaymoreBot
  • Если маркер переноса в архив {{шаблон}}, то бот будет искать {{шаблон}} и {{шаблон| (с учетом регистра).
  • Не проверяет Участник:ClaymoreBot/Архивация/Список для параметра обрабатывать. (TODO)
  • Параметры формат и страница являются синонимами.
  • Параметр тип игнорируется.


Неподдерживаемые параметры
  • тип = статьи для рецензирования
  • размер правки
  • размер архивации
  • решения
  • отклонённые заявки
#!/usr/bin/env python3
# -*- mode: python; coding: utf-8; -*-

import sys, os, re
from datetime import datetime, timedelta, timezone
import traceback
from pprint import pprint, pformat
import pywikibot
from pywikibot import textlib

today = datetime.now(timezone.utc)


TEST = True # сохранять страницы?

SORT_THREADS = False # Сортировать трэды в архиве или вставлять в нужное место

# шаблон конфигурации
config_template = 'Участник:ClaymoreBot/Архивация'
config_template_names = ('User:ClaymoreBot/Архивация',
                         'Участник:ClaymoreBot/Архивация')


site = pywikibot.Site('ru', 'wikipedia')



class Config:
    def __init__(self, configpage: pywikibot.Page):
        self.configpage = configpage.title() # for logging
        self.pagename = None # какую страницу архивировать
        self.archivepage = None # страница архива (шаблон)
        self.numbering  = False # тип = нумерация
        self.markers = [] # список маркеров переноса в архив: == Итог ==, {{closed}} и т.д.
        self.markerdelay = None # через сколько дней архивировать, если есть markers
        self.changedelay = 14 # через сколько дней архивировать, если нет изменений
        self.header = None # заголовок нового архива
        self.toup = False # по умолчанию новые снизу
        self.skipmarker = '' # маркер оставления трэда
        self.deltext = [] # текст, удаляемый при переносе в архив

        self.unknown_parameter = False
        self.template_not_found = False # шаблон не найде или не заполнен

        resultonly = False # итог = да -- архивировать только с итогом

        self.params = None
        for n, p in textlib.extract_templates_and_params(configpage.text, strip=True,
                                                         remove_disabled_parts=True):
            # n - имя шаблона, p - список параметров (OrderedDict)
            for ctn in config_template_names:
                if n == ctn:
                    # найден шаблон конфигурации
                    break
            else:
                continue
            self.params = p
            for i in p:
                # разбор параметров конфигурации
                if i == 'архивировать с': # маркеры архивации
                    e = 0
                    tt = ''
                    for m in re.finditer('"(.*?)"', p[i]):
                        # в кавычках
                        self.add_marker(m.group(1))
                        tt += p[i][e:m.start()]
                        e = m.end()
                    tt += p[i][e:]
                    for s in tt.split(','):
                        # без кавычек
                        s = s.strip()
                        if s:
                            self.add_marker(s)
                elif i == 'обрабатывать':
                    # FEXME: проверять [[Участник:ClaymoreBot/Архивация/Список]]
                    self.pagename = p[i]
                elif i == 'формат' or i == 'страница':
                    if p[i]:
                        self.archivepage = p[i]
                elif i == 'срок':
                    if p[i]: self.changedelay = int(p[i])
                elif i == 'задержка принудительной архивации':
                    self.markerdelay = int(p[i])
                elif i == 'заголовок':
                    self.header = p[i]
                elif i == 'итог':
                    if p[i] == 'да':
                        self.add_marker('== Итог ==')
                        self.add_marker('==Итог==')
                        self.add_marker('==Итог ==')
                        self.add_marker('== Итог==')
                        resultonly = True
                elif i == 'новые':
                    if p[i] == 'сверху':
                        self.toup = True
                elif i == 'тип':
                    if p[i] not in ('страница', 'месяц', 'квартал', 'полгода', 'год', 'нумерация'):
                        pywikibot.logging.warning('{}: unknown type: {}'.format(configpage, p[i]))
                        self.unknown_parameter = True
                elif i == 'тем в архиве':
                    self.numthreads = int(p[i])
                elif i == 'пропускать с':
                    self.skipmarker = p[i]
                elif i == 'убирать из архива':
                    self.deltext.append(p[i])
                elif i == 'убирать ссылки':
                    if p[i] == 'да':
                        self.deltext.append('http://')
                        self.deltext.append('https://')
                elif i not in ('абсолютный путь', 'показывать'): # игнорируемые параметры
                    pywikibot.logging.warning('{}: unknown parameter: {} = {}'.format(
                        configpage, i, p[i]))
                    self.unknown_parameter = True

            break

        if not self.params:
            # шаблон не найден - вставлен через другой шаблон (/Шапка и т.д.)
            pywikibot.logging.warning(configpage.title(as_link=True)+': template not found')
            self.template_not_found = True
            return

        if self.pagename is None:
            # если архивируемая страница не указана в конфиге
            # используем страницу конфига
            self.pagename = configpage.title()

        if self.archivepage is None:
            # если страница архива не указана в конфиге
            self.archivepage = self.pagename+'/Архив'
        else:
            if ':' not in self.archivepage: # относительный путь
                self.archivepage = self.pagename+'/'+self.archivepage
            # формируем шаблон для format()
            self.archivepage = (self.archivepage
                                .replace('%(год)', '{year}')
                                .replace('%(месяц)', '{month:02d}') # ноль в начале если < 10
                                .replace('%(полугодие)', '{halfyear}')
                                .replace('%(квартал)', '{quarteryear}')
                                .replace('%(номер)', '{number}'))
            if '{{number}}' in self.archivepage:
                self.numbering = True

        if self.markerdelay is None:
            self.markerdelay = self.changedelay

        if resultonly:
            self.markerdelay = self.changedelay
            self.changedelay = 9999 # архивировать только при наличии маркеров


        #self.add_marker('{{перенесено}}')


    def add_marker(self, marker):
        if marker not in self.markers:
            self.markers.append(marker)
        if marker.startswith('{{') and marker.endswith('}}'):
            # шаблон
            marker = marker[:-2]+'|'
            if marker not in self.markers:
                self.markers.append(marker)



months = 'января|февраля|марта|апреля|мая|июня|июля|августа|сентября|октября|ноября|декабря'
months_l = months.split('|')
date_r = '(\d\d):(\d\d), (\d\d?) ({}) (20\d\d) \(UTC\)'.format(months)
date_p = re.compile(date_r)

class Thread:
    def __init__(self, section: textlib.Section):
        self.title = section.title
        self.content = section.content
        self.startdate = None # дата создания трэда
        self.changedate = None # дата последнего сообщения
        # ищем таймстампы
        ts = []
        for m in date_p.finditer(section.content):
            month = months_l.index(m.group(4))+1
            # except ValueError:
            d = datetime(int(m.group(5)), month, int(m.group(3)),
                         int(m.group(1)), int(m.group(2)), tzinfo=timezone.utc)
            ts.append(d)

        if ts: # найден хотя бы один
            #self.startdate = min(ts) # самый ранний таймстамп
            self.startdate = ts[0] # первый встреченный таймстамп
            self.changedate = max(ts) # самый поздний таймстамп



def get_sections(sect):
    # объединяет разделы и возвращает список разделов (textlib.Section) второго уровня (==...==)
    ret = []
    prev = None
    for s in sect.sections:
        if s.level == 2:
            if prev:
                ret.append(prev)
            prev = s
        elif prev:
            # объединяем разделы
            prev = textlib.Section(prev.title, prev.content+s.title+s.content)
        else:
            prev = s
    if prev:
        ret.append(prev)
    return ret


def sort_thread(asect, athreads, toup):
    # сортируем трэды в архиве по дате создания
    athr = []
    for s in asect.sections:
        th = Thread(s)
        if not th.startdate:
            # startdate не найден - просто добавляем в конец/начало
            txt = ''.join(s.title+s.content for s in asect.sections)
            atxt = ''.join(s.title+s.content for s in athreads)
            if toup:
                # новые сверху
                txt = asect.header+atxt+txt+asect.footer
            else:
                txt = asect.header+txt+atxt+asect.footer
            return txt

        athr.append(th)

    # два разных алгоритма: (оба работают)
    if SORT_THREADS:
        # 1. сортируем
        athr += athreads
        athr.sort(key=lambda x: x.startdate, reverse=toup)
    else:
        # 2. вставляем в нужное место
        for a in athreads:
            for i in range(len(athr)):
                if ((toup and a.startdate > athr[i].startdate) or
                    (not toup and a.startdate < athr[i].startdate)):
                    athr.insert(i, a)
                    break
            else:
                if toup:
                    athr.insert(0, a)
                else:
                    athr.append(a)

    txt = asect.header+(''.join(s.title+s.content for s in athr))+asect.footer
    return txt


templ_del_p = re.compile('({{User:ClaymoreBot/Архивация.*?}})', flags=re.S|re.M)
def arch(conf: Config):
    # не обрабатывам страницы с неизвестными параметрами в конфиге
    if conf.unknown_parameter or conf.template_not_found:
        return False

    page = pywikibot.Page(site, conf.pagename)
    sect = textlib.extract_sections(page.text+'\n\n', site) # MediaWiki убирает \n в конце страницы
    sections = get_sections(sect)

    newtext = [] # новый текст страницы (список textlib.Section)
    archtext = [] # текст переносимый в архив (список Thread)

    # проверка на перенос в архив
    for s in sections:
        if conf.skipmarker and conf.skipmarker in s.content:
            newtext.append(s)
            continue

        th = Thread(s)
        if not th.startdate:
            # таймстампы не найдены
            #pywikibot.logging.warning('timestamps not found: '+th.title)
            newtext.append(s)
            continue

        for t in conf.deltext:
            th.content = th.content.replace(t, '')

        # FEXME: что делать, если в переносимом тексте есть {{User:ClaymoreBot/Архивация}}
        #th.content = templ_del_p.sub('', th.content)
        th.content = templ_del_p.sub(r'<nowiki>\1</nowiki>', th.content)
        #assert not templ_del_p.search(th.content) # найден шаблон {{User:ClaymoreBot/Архивация}}

        for mr in conf.markers:
            # ищем markers
            if mr in s.title or mr in s.content:
                if today - th.changedate >= timedelta(days=conf.markerdelay):
                    # архивируем спустя markerdelay
                    archtext.append(th)
                else:
                    # оставляем
                    newtext.append(s)
                break
        else:
            # markers не найдены, проверяем changedelay
            if today - th.changedate >= timedelta(days=conf.changedelay):
                # переносим в архив
                archtext.append(th)
            else:
                # оставляем
                newtext.append(s)


    if not archtext:
        # нечего архивировать
        return False

    # сохраняем архив
    pywikibot.logging.info('** Archiving: '+page.title())

    arch_pages = {} # ключ: страница архива (str), значение: список Thread

    if conf.numbering:
        # тип = нумерация
        num = 1
        pp = None
        while True: # ищем последний архив
            ap = pywikibot.Page(site, conf.archivepage.format(number=num))
            if not ap.exists():
                break
            pp = ap
            num += 1
        if pp:
            nsect = len(get_sections(textlib.extract_sections(pp.text, site))) # количество трэдов в последнем архиве
            atitle = pp.title()
        else:
            nsect = 0
            atitle = conf.archivepage.format(number=1)
        for th in archtext:
            pywikibot.logging.info('to arch: '+th.title)
            if nsect >= conf.numthreads:
                atitle = conf.archivepage.format(number=num)
                num += 1
            if atitle not in arch_pages:
                arch_pages[atitle] = [th]
            else:
                arch_pages[atitle].append(th)
            nsect += 1

    else:
        # тип не нумерация
        for th in archtext:
            pywikibot.logging.info('to arch: '+th.title)
            d = {'year': th.startdate.year,
                 'month': th.startdate.month,
                 'halfyear': 1 + int((th.startdate.month-1)/6),
                 'quarteryear': 1 + int((th.startdate.month-1)/3)}
            ap = conf.archivepage.format(**d)
            if ap not in arch_pages:
                arch_pages[ap] = [th]
            else:
                arch_pages[ap].append(th)

    for ap in arch_pages:
        # сохраняем архивы
        apage = pywikibot.Page(site, ap)
        assert apage.namespace() == page.namespace()
        if apage.exists():
            asect = textlib.extract_sections(apage.text+'\n\n', site) # MediaWiki убирает \n в конце страницы
            txt = sort_thread(asect, arch_pages[ap], conf.toup)
        else:
            txt = ''.join(s.title+s.content for s in arch_pages[ap])
            if conf.header:
                # добавляем заголовок в новый архив
                txt = conf.header+'\n\n'+txt

        message = 'Archiving ({})  [[{}]]'.format(len(arch_pages[ap]), conf.pagename)
        # save txt to apage
        pywikibot.logging.info('** save to arch: '+ap)
        pywikibot.logging.info('message: '+message)
        pywikibot.showDiff(apage.text, txt, context=2)
        if not TEST:
            apage.text = txt
            apage.save(message)

    # сохраняем страницу
    ap = list(arch_pages.keys())
    ap.sort()
    message = 'Archiving ({})  [[{}]]'.format(len(archtext), ']], [['.join(ap))
    txt = sect.header+''.join(s.title+s.content for s in newtext)+sect.footer

    # save txt to page
    pywikibot.logging.info('** save new txt: '+page.title())
    pywikibot.logging.info('message: '+message)
    pywikibot.showDiff(page.text, txt, context=2)
    if not TEST:
        page.text = txt
        page.save(message)

    return True



def main():

    if sys.argv[1:]:
        for pn in sys.argv[1:]:
            pywikibot.logging.info('do: '+pn)
            p = pywikibot.Page(site, pn)
            conf = Config(p)
            arch(conf)
            pywikibot.logging.info(pformat(conf.__dict__))

    else:
        tn = 0
        cn = 0
        un = 0
        upp = []

        tp = pywikibot.Page(site, config_template)
        for p in tp.getReferences(only_template_inclusion=True): # ссылки сюда
            if p.namespace() in (
                    'Википедия:',
                    'Обсуждение:',
                    'Обсуждение участника:',
                    'Обсуждение Википедии:',
                    'Обсуждение проекта:',
                    'Обсуждение шаблона:',
                    'Обсуждение категории:',
                    'Обсуждение портала:',
                    'Обсуждение модуля:',
            ):
                #pywikibot.logging.info('do: '+p.title())
                tn += 1
                try:
                    conf = Config(p)
                    if conf.unknown_parameter:
                        un += 1
                        upp.append(p.title())
                    if arch(conf):
                        pywikibot.logging.info(pformat(conf.__dict__))
                        cn += 1
                except KeyboardInterrupt:
                    break
                except:
                    err = traceback.format_exc()
                    pywikibot.logging.error(err)

                #if cn >= 50: break


        pywikibot.logging.info('''total: {}
changed: {}
unknown_parameter: {}'''.format(tn, cn, un))

        print(upp)


main()
Downgrade Counter