あぐちゃんさんの物置き場

オタク、ブレボを溶かしがち

推しの画像を集めてTLに持ってくるtwitterBOT Python3

めんどくさいが原動力

TLに推しの画像をかき集めるために絵師様のフォロー専門のアカウントを用意するのがめんどくさかったし、保存すればスマホのストレージも圧迫されるしで、普段使い用のアカウントのTLになんとかして自動で人気の絵ツイを持ってこれないかと考えました。なので、twitterAPIをしばいて、人気の画像をRTしてついでにローカルに保存するBOTを作りました。やったぜ。プログラムはどこのご家庭にもあるラズパイサーバーで毎日24時間動いています。現在、@agctoolとして稼働しています。好きなVtuberの画像を中心に集めているので、TLに放流したい方は是非。
BOTに対してメンションにコマンドをつけてツイートすることで、設定を追加したり、検索および回収&放流を手動で開始することができます。
例によって内容はアレなコードです。よくわかんないエラーが起きてexceptに引っかかってもpassでガン無視しちゃってるので、いつ死んでもおかしくありません(稼働始めてから半年間まだ死んだことないけど)。コードの使用は自己責任で。コピペで動くかどうかは知りません。悪しからず。保存した画像を他所のSNSに垂れ流したり配布等はしないように(規約に引っかかります)。

ファイルは4つあります。すべて同じ階層に配置しています。
・key :各種APIキーを保存するファイル
・config.json:設定ファイル
・saved :画像を保存済みのtweetIDを保存するファイル(最初は空のファイルを用意してください)
・app.py :BOT本体

上記のファイルと同じ階層にあるdata_storageというフォルダに保存した画像が順次格納されていきます。
仕組みを知りたかったら頑張ってクソ雑コードを読んでください。

key

{
    "consumer_key":"*************************",
    "consumer_secret":"************************************************",
    "access_token_key":"**************************************************",
    "access_token_secret":"*********************************************"
}

config.json

{
    "save_path": "data_storage/",
    "search": [
        {
            "keyword": "**************",
            "min_faves": 810,
            "download": true
        },
        {
            "keyword": "***********:",
            "min_faves": 114,
            "download": true
        },
        //以下同形式のデータ
        {
            "keyword": "*********",
            "min_faves": 514,
            "download": "true"
        }
    ]
}


saved

{
    "data": [
        {
            "id": *******************,
            "date": "2021-04-10"
        },
        {
            "id": *******************,
            "date": "2021-04-10"
        },
        //以下同形式のデータ
        {
            "id": *******************,
            "date": "2021-04-16"
        }
    ],
    "number": 11149
}

4/16日の時点で11149件の画像を保存しているらしい....

app.py

import tweepy
import json
import datetime
import requests
import threading
import time


#基本設定項目
#管理者のID
OPID = *******************
OPNAME = '[使いたい人のアカウントの名前 例:Agchan_Luice]'
BOTNAME = '[BOTにするアカウントの名前]'

#APIキー読み込み
kf = open('key',encoding='utf-8')
keys = json.load(kf)
kf.close()

#設定ファイル読み込み
cf = open('config.json',encoding='utf-8')
cfg = json.load(cf)
save_path = cfg['save_path']
cf.close()

#保存済み画像IDリスト読み込み
sf = open('saved',encoding='utf-8')
saved = json.load(sf)
sf.close()

#API認証
auth = tweepy.OAuthHandler(keys['consumer_key'], keys['consumer_secret'])
auth.set_access_token(keys['access_token_key'], keys['access_token_secret'])
api = tweepy.API(auth)



#画像収集及びリツイート
def collect():
    #日付取得
    date = datetime.date.today().strftime("%Y-%m-%d")
    todaytime = datetime.datetime.strptime(date,"%Y-%m-%d")
    limit = todaytime - datetime.timedelta(days=7)

    #設定ファイルから読み込み
    for word in cfg['search']:
        que = word['keyword'] + ' -RT min_faves:' + str(word['min_faves']) + ' since:' + limit.strftime("%Y-%m-%d")
        #print(que)
        #検索
        search_result = api.search(q = que)

        #写真を含むツイートのみ選択
        for result in search_result:

            #保存したかどうか確認
            isSavedTweet = False
            if 'media' in result.entities:
                for media in result.entities['media']:
                    if media['type'] == 'photo':
                        #保存済みでないツイートだけ選択
                        f = True
                        #f = False
                        for s in saved['data']:
                            if result.id == s['id']:
                                f = False
                        if f:
                            url = media['media_url_https']
                            #保存する
                            if word['download'] == 'true':
                                response = requests.get(url)
                                img = response.content
                                filename = save_path + date + '-' + str(saved['number']) + '.jpeg'
                                with open(filename,"wb") as f:
                                    f.write(img)
                                saved['number'] += 1
                            isSavedTweet = True

            if isSavedTweet:
                #セーブしたらリツイート
                try:
                    api.retweet(result.id)
                except:
                    pass
                d = {"id":result.id,"date":date}
                saved['data'].append(d)

    #一週間以前の古いデータを削除
    saved['data'] = [s for s in saved['data'] if datetime.datetime.strptime(s['date'],"%Y-%m-%d") > limit]
    #書き込み
    wd = json.dumps(saved,indent=4)
    ow = open("saved","w",encoding='utf-8')
    ow.write(wd)
    ow.close()


#管理者のツイートをストリーミング
class cmdlistener(tweepy.StreamListener):
    def on_data(self, data):
        #ツイート保存いらん
        #global dir_idx

        tweet = json.loads(data)
        if not tweet['retweeted'] and 'RT @' not in tweet['text']: # retweetは取得しない
            #save_path = "%s/%s.json" % ( query_dir, tweet["id_str"] )
            #f = open(save_path, "w")
            #json.dump(tweet, f)
            #f.close()
            if tweet['user']['id'] == OPID:
                cmdfunc(tweet)
            return True
        return True

    def on_error(self, status):
        #print( status )
        pass

def cmd_streaming():
    que = ['@' + BOTNAME]

    l = cmdlistener()

    def start_stream():
        while True :
            try:
                stream = tweepy.Stream(auth, l)
                stream.filter(track = que)
            except:
                pass
    #print( 'target query:', que )
    start_stream()

def reply(tweet,text:str):
    try:
        dst_tw_id = tweet['id']
    except:
        pass
    try:
        api.update_status(status = '@' + tweet['user']['screen_name'] + "\n" + text,in_reply_to_status_id = str(dst_tw_id))
    except:
        pass

def cmdfunc(tweet):
    cmd = tweet['text'].split()
    #print(cmd)
    try:
        if cmd[1] == 'srwadd':
            try:
                q = cmd[2]
                f = int(cmd[3])
                if type(q) != str:
                    reply(tweet,'error:invalid type' + type(q))
                    return -1
                if type(f) != int:
                    reply(tweet,'error:invalid type' + type(f))
                    return -1
                cft = open('config.json',"w",encoding='utf-8')
                tf = False
                if len(cmd) >= 5:
                    if cmd[4] == 'false':
                        tf = True
                if tf:
                    cfg['search'].append({'keyword':q,'min_faves':f,'download':'false'})
                else:
                    cfg['search'].append({'keyword':q,'min_faves':f,'download':'true'})
                cft.write(json.dumps(cfg,indent=4))
                cft.close()
                reply(tweet,"設定を追加しました。")
                return 0
            except:
                reply(tweet,"usage : srwadd \'queue\' \'min_faves\'")
                return -1
        if cmd[1] == 'searchm':
            reply(tweet,"更新を実行します。")
            collect()

        if cmd[1] == 'force-shutdown':
            reply(tweet,"強制終了します。")
            threading.stop()
    except:
        reply(tweet,"unknwon error occurred.")


#各スレッド設定
class once_h(threading.Thread):
    def __init__(self, thread_name):
        self.thread_name = str(thread_name)
        threading.Thread.__init__(self)

    def __str__(self):
        return self.thread_name

    def run(self):
        while True:
            try:
                collect()
            except:
                print('error?')
            #4時間に1回動作
            time.sleep(14400)

class cmd_getstream(threading.Thread):

    def __init__(self, thread_name):
        self.thread_name = str(thread_name)
        threading.Thread.__init__(self)

    def __str__(self):
        return self.thread_name

    def run(self):
        cmd_streaming()

def main():
    t1 = once_h(thread_name='time')
    t2 = cmd_getstream(thread_name='every')

    t1.start()
    t2.start()

    thread_list = [t1,t2]

    for thread in thread_list:
        thread.join()

main()

keyファイルには各種APIキーを記入します。savedファイルの中身は勝手に管理されます。期間を決めて検索するので、その期間中にすでに保存した画像を重複して保存しないためにあるファイルです。
config.jsonの中に、所定の形式で設定を書き込むと、その設定項目すべての検索を4時間に一度行います(あんまり検索項目多いと途中で止まるかも)。"keyword"キーには、検索したい文字列(例:#xxアート)を入れます。downloadの項目は、trueにしておくとRTと画像の保存をします。falseにすると、RTのみを行って画像は保存しません。min_favesの項目には、検索に引っ掛ける最低いいね数を0以上の数値で設定します。怒られそう。申し訳なさすぎるけどこれがないとヒット数多すぎるので勘弁して......。config.jsonの設定は、コマンドで追加できます。
例:srwadd #xxxArt 1000 true #xxxArtのうちいいねが1000以上で検索、画像の保存を有効(trueをfalseにすると画像が保存されずRTのみ行われます)
TL上でのコマンドは、app.pyの上の方で設定してるOPIDと名前が一致しているユーザーの言うことしか聞きません。コマンドは増やそうと思えば増やせるけどめんどくさくて放置しています。今のままで十分当初ほしかった機能を果たしてくれています。
みなさんも任意のBOTを書いてタイムラインを豊かにしましょう。

フーリエ変換の実用例でたまに見るアレ

プログラミングを習ってからというもの、数々のゴミのようなソースコードを量産して遊んできたわけですが、それらのガラクタの中にはそれなりに気に入っている物もあったりするわけです。雑にコードを書いてポイ捨てするときは、だいたいpythonを使います。今回はまず一つ目。

 

フーリエ変換で絵を周波数へ

これはおそらく去年に作ったものです。なんでも周波数に分解したくなる病気に一時期かかっていたので、その時に作りました。ソースコードは2つ。

データを生成するプログラム
from ctypes import windll, Structure, c_long, byref
import time
from datetime import datetime
import tkinter
from PIL import Image, ImageDraw
import copy
import json

datalist = []
posbuffer = []

print("Enter new data file name.")
fname = input()


class Application(tkinter.Frame):
    def __init__(self, master=None):
        super().__init__(master)
        self.master = master
        self.master.title('tkinter canvas trial')
        self.pack()
        self.create_widgets()
        self.setup()

    def create_widgets(self):
        self.vr = tkinter.IntVar()
        self.vr.set(1)
        self.write_radio = tkinter.Radiobutton(self, text='write', variable=self.vr, value=1, command=self.change_radio)
        self.write_radio.grid(row=0, column=0)
        self.erase_radio = tkinter.Radiobutton(self, text='erase', variable=self.vr, value=2, command=self.change_radio)
        self.erase_radio.grid(row=0, column=1)

        self.clear_button = tkinter.Button(self, text='clear all and output data.', command=self.clear_canvas)
        self.clear_button.grid(row=0, column=2)

        self.save_button = tkinter.Button(self, text='save', command=self.save_canvas)
        self.save_button.grid(row=0, column=3)

        self.test_canvas = tkinter.Canvas(self, bg='white', width=700, height=700)
        self.test_canvas.grid(row=1, column=0, columnspan=4)
        self.test_canvas.bind('<B1-Motion>', self.paint)
        self.test_canvas.bind('<ButtonRelease-1>', self.reset)

    def setup(self):
        self.old_x = None
        self.old_y = None
        self.color = 'black'
        self.eraser_on = False
        self.im = Image.new('RGB', (700, 700), 'white')
        self.draw = ImageDraw.Draw(self.im)

    def change_radio(self):
        if self.vr.get() == 1:
            self.eraser_on = False
        else:
            self.eraser_on = True

    def clear_canvas(self):
        self.test_canvas.delete(tkinter.ALL)
        f = open('data\\' + fname + '.json',"w")
        if f == None:
            print('Write failed.')
            exit()
        
        bf = {}
        
        i = 0
        j = 0
        
        for s in datalist:
            for p in s:
                bf[str(i) + ':' + str(j)] = p
                j += 1
            j = 0
            i += 1
        bf['length'] = i - 1
        
        output = json.dumps(bf)
        f.write(output)
        
        f.close()
        exit()

    def save_canvas(self):
        self.test_canvas.postscript(file='out.ps', colormode='color')

    def paint(self, event):
        if self.eraser_on:
            paint_color = 'white'
        else:
            paint_color = 'black'
        if self.old_x and self.old_y:
            self.test_canvas.create_line(self.old_x, self.old_y, event.x, event.y, width=5.0, fill=paint_color, capstyle=tkinter.ROUND, smooth=tkinter.TRUE, splinesteps=36)
            self.draw.line((self.old_x, self.old_y, event.x, event.y), fill=paint_color, width=5)
        self.old_x = event.x
        self.old_y = event.y
        posbuffer.append([self.old_x,self.old_y])

    def reset(self, event):
        self.old_x, self.old_y = None, None
        tmp = copy.deepcopy(posbuffer)
        datalist.append(tmp)
        posbuffer.clear()
        


root = tkinter.Tk()
app = Application(master=root)
app.mainloop()

tkinterとPIL等を使って、線を描いたらそれをソースコードと同じ階層のdataというフォルダにjson形式で保存するプログラムがこれ。起動すると、お絵描きをすることができ、上のボタンを押すとデータを保存できます。
ドットを打つタイミングで座標データを回収してるみたいです。どこかのサイトからプログラムをコピペして、改造して作った記憶があります。データの保存の仕方があまりにも雑なので、このようなコードはあんまりおすすめしません。(絵を描くやつはとても助かりました。コピペ元へ感謝を。)

f:id:Agchan_Luice:20210415231742p:plain
絵を描く画面はこんな感じ
フーリエ変換して周波数に分解、sinとcosで戻す
import json
import copy
import cmath as cm
import numpy as np
import tkinter as tk
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import matplotlib.patches as patches

Picture_size = 700

def dft(data):
    #合ってんの?これ
    ret = []
    n = len(data)
    
    for k in range(0,n):
        w = cm.exp(-1j * cm.pi * 2.0 * k / float(n))
        s = 0
        
        for i in range(0,n):
            s += data[i] * (w ** i) / n * 2
        
        ret.append(s)
    
    return ret



#json読み取り
print('Enter read data name')

c = input()

try:
    f = open('data\\' + c + '.json','r')
except:
    print('Failed to open the file.')
    exit()

print("File loading.....")

rawdata = f.read()

f.close()

data = json.loads(rawdata)

pos = []
i = 0
j = 0

length = data['length']

breakflag = False

print('Converting data.....')

while True:
    while True:
        try:
            pos.append(data[str(i) + ':' + str(j)])
        except:
            if(length < i):breakflag = True
            if breakflag:break
            i += 1
            j = 0
            break
        j += 1
    if breakflag:break


#変換
x_ = []
y_ = []

for s in pos:
    x,y = s[0] - Picture_size // 2,Picture_size // 2 - s[1]
    x_.append(x)
    y_.append(y)

n = len(x_)


#LPF
lpf_rate = 1
lpx = [0] * (n - lpf_rate)
lpy = [0] * (n - lpf_rate)

for i in range(0,n - lpf_rate):
    for j in range(0,lpf_rate):
        lpx[i] += x_[i + j]
        lpy[i] += y_[i + j]
    lpx[i] /= lpf_rate
    lpy[i] /= lpf_rate

print("LPF rate is",lpf_rate,".")

N = n - lpf_rate


print("Number of data is",N)
print('Fourier transrating with DFT......')

#DFTで変換
Fx = np.fft.fft(lpx)
Fy = np.fft.fft(lpy)

print("Fourier transrating was successfully.")
print("Getting frequency list......")
#周波数リストを取得
x_freq = []
y_freq = []

for i in range(0,N // 2 - 1):
    x_freq.append(i + 1)
    y_freq.append(i + 1)

print("Data format")
#有効な範囲だけにぎゅむっと
Fx = Fx[1:Fx.size // 2]
Fy = Fy[1:Fy.size // 2]


print("Restorationing with cos and sin function....")
#周波数から復元
div = 300
N = Fx.size
x = [0] * div
y = [0] * div
#実部 * cos + |虚部| * sin
sin_table = []
cos_table = []
resol = 1600
for i in range(0,resol):
    sin_table.append(cm.sin(2 * cm.pi * i / resol))
    cos_table.append(cm.cos(2 * cm.pi * i / resol))

def fsin(t:int):
    if t > 0:
        return sin_table[int(t) % 1600]
    else:
        return -sin_table[int(abs(t)) % 1600]

def fcos(t:int):
    return cos_table[int(abs(t)) % 1600]


for i in range(0,N):
    for j in range(0,div):
        #x[j] += (Fx[i].real * cm.cos(x_freq[i] * 2 * cm.pi * (1 - j / div)) + int(Fx[i].imag) * cm.sin(x_freq[i] * 2 * cm.pi * (1 - j / div))) / N
        #y[j] += (Fy[i].real * cm.cos(y_freq[i] * 2 * cm.pi * (1 - j / div)) + int(Fy[i].imag) * cm.sin(y_freq[i] * 2 * cm.pi * (1 - j / div))) / N
        x[j] += (Fx[i].real * fcos(x_freq[i] * resol * (1 - j / div)) + (Fx[i].imag * fsin(x_freq[i] * resol * (1 - j / div)))) / N
        y[j] += (Fy[i].real * fcos(y_freq[i] * resol * (1 - j / div)) + (Fy[i].imag * fsin(y_freq[i] * resol * (1 - j / div)))) / N

#描画
fig = plt.figure()
ims = []
plt.axes().set_aspect('equal')

for i in range(0,div):
    #グラフ描画
    im = plt.plot(x[0:i],y[0:i],'b')
    
    ims.append(im)

ani = animation.ArtistAnimation(fig,ims,interval = 50)
plt.show()


#plt.plot(x,y)
#plt.show()

まず、データを読み込んだらx座標とy座標でそれぞれフーリエ変換を実施して、周波数成分に分解します。計算結果の実部がcos(偶関数)成分で、虚部がsin(奇関数)成分を構成しているので、それぞれ分けて計算した後に足して出力データにつなげていき、Plot上に出力しているようです。ふつうは各周波数のゲインを調べたいので、この複素数の絶対値を取るのが一般的です。リアルタイムの音声データをぶち込んで変換し、絶対値を取って周波数でプロットしてやると、高い音とか低い音とかで伸びる場所が違うバランみたいなメーターを再現できます(なんていうんだアレ)。

numpyのFFT機能を使っています(dftって表示してるけどおそらくデータ数をうまいこと調整してFFTしているはず)。フーリエ変換(FFT、DFT、STFTなど、特に音声解析)に特化したライブラリはどうやらたくさんあるようで、コピペだけでリアルタイム音声の変換ができるやつもあります。librosaとか結構よかったです(これのスペクトログラムを表示する機能を使って、特定の音のパターンに反応する女児向け玩具を騙すためにその周波数を解析して、マイコンとスピーカーを使って騙す遊びをしたけど動画取り損ねた)。最初にdft関数を定義していますが、めちゃくちゃ遅かったような気がします。結局numpy製に落ち着いたのでしょうか、当時のことはまるで覚えていません。でもちゃんと今でも動きます。すごい。(率直)

f:id:Agchan_Luice:20210415231955p:plainf:id:Agchan_Luice:20210415232118p:plain
一筆書きじゃないとき
f:id:Agchan_Luice:20210415233224p:plainf:id:Agchan_Luice:20210415233227p:plain
一筆書きの時

一筆書きだと元の絵がかなり再現できますが、一度でもマウスのクリック離したりすると座標に不連続な部分が発生するため、その部分が高周波成分がクソデカになるので、めちゃくちゃわかりやすく発振します(ギブス現象って言うらしい)。

書きたかったことは書いたのでこの辺で。

ロボコンで作ったやつ

作ったもの思い出そうの一つ目。

とりあえず、ロボコン部としての活動で作ったものを置いておこうと思ったけど、1年と2年の時に作ったものは写真が見つからないのでパス。あ、でも2年の冬に移動用のライブラリ作ったっけ。これは動画がありますね。

 

カチカチ言ってるのは当時のリレーモタドラです。

 プログラムの仕組みは秘密です。卒業したら原理の解説だけはするかも。

 

次。

 

3年の時のロボコンは2020年度。

https://www.cemedine.co.jp/cemedine_reports/kousenrobocon2020-kyusyuokinawa.html

ここに載ってる久留米のマシンの両方のプログラムをそれぞれ一部書きました。

Aチームはパソコンのペンタブのペンの位置と書いてる/書いてないをマシンに送信するプログラムを、Bチームは首の制御とマシン全体の制御と回路を担当しました。

首の制御の動画ありました。確かarcsinあたりで近似して動かしてたはず。

 本編はロボコンの映像がyoutubeに公式から上がってるはずなのでそっち見てください。

 

表立ってロボコンでやったことといえばこのくらいでしょうか。

飽きたのでおわり。

はつかきこ

自分用の掃き溜めです。読んでもためにならないと思います。たぶん。

何やってきたか自分でもよくわかんなくなってきたので、物置き場として活用します。めんどくさいので適当にしか書きません。

これまで作ってきた物のうち、記憶と記録があるものを一通り掘り出したら、進行中のお遊びを置いていく予定。気分が乗ったらちまちま書きます。

f:id:Agchan_Luice:20210413224216j:plain

これは飯テロです。