マインクラフトは手ごろなCPUエミュ
ロマンを求めて
CPUを自作してみたいと思ったら、ロジックICを組み合わせて作るのが普通かと思うのですが、そこまでするお金と気力がありませんでした。そこで使ったのがマインクラフトというゲームです。もとよりさんざん遊んでいたこのゲーム、実はゲーム内で論理回路を作ることができます。論理回路というよりは順序回路ですが。
ということで、ゲーム内でCPUを作ってみました。いつか作ってみたいと思っていたんですよね。
全体像がこちら。
4bitアーキテクチャです。とりあえず一個作ろうと思って作ったものです。一回作るとコツが見えてくるので、二回目またすぐ作ることにしました。それがこちら。
こちらは8bitアーキテクチャです。明らかに巨大化しています。一つ目は制作に一週間かかったのに、2つ目は2~3日程度で完成しました(結局はコピペで同じモジュールの量産なので規模自体はあんまり関係ない)。仕組みの解説はこっちでやります。一個目はアーキテクチャがあまりにもアレだったので...
2個目の方のアーキテクチャがこちら。図が汚いけど我慢してください。
とりあえず動けばよいという構成です。
仕組み
CPUを構成している部品は主にこれら8つのユニットです。
・プログラムメモリ
・命令カウンタ
・命令デコーダ
・Wレジスタ
・RAM汎用レジスタ
・ALU
・条件分岐
・GPIO(General Purpose Input/Output)
一つづつ解説します。
・プログラムメモリ
一番サイズ的に大きなユニットです。いわゆる機械語をここに順番に書きこんでいきます。1命令14bitとなっています。00011010011010みたいな感じで並べていきます。レッドストーントーチというブロックを手作業で一つづつ設置してビットを立ち上げていく必要があるので、プログラムの書き込みはかなりの重労働です。おかげでこのCPUは機能としてはいろいろできる割に、書き込むのが大変すぎて一番長いコードでも8byte型の足し算くらいしかしていません。かわいそう
・命令カウンタ
プログラムメモリから読み出す命令のアドレスをカウントする部分です。ジャンプ命令が来ると、指定のアドレスまで飛びます。また、条件分岐の命令で評価が真だった場合は条件分岐のモジュールから信号が飛んできて、それをトリガにして命令を1つ飛ばします。飛んだ先にジャンプ命令を置いておくことで、if文みたいな動作をできます。
・命令デコーダ
プログラムメモリから吐き出された命令を各モジュールへと分配する回路です。要はただのデマルチプレクサなわけですが。上3桁の数値をアドレスにして分けています。
・Wレジスタ
いわゆるレジスタです。RとLとの二つの1byte分のメモリがあり、それぞれ読み書きができます(以後WR、WLと表記)。これがモジュール間でデータをやりとりするためのバッファとなっています。
・RAM汎用レジスタ
ランダムアクセスメモリ、通称RAMです。よくPCのスペックに書かれているアレです。CPUの部品の一つとして数えるのはちょっと違うような気もしますが必ず必要なので。
レジスタです。作った時は勘違いしてましたが、アドレスを変数で管理できないのでこれは汎用レジスタとなります。
WRもしくはWLの値を読んで格納したり、またWRもしくはWLに書き込めます。容量はなんと16byte分しかありません。これでも一個目のCPUより4倍容量が増えたんです。
追記(2022.6.3)
次世代CPUを作成中ですが、こちらにはレジスタとは別に256byteのRAMを搭載予定です。いわゆるポインタが利用できるようになるので、配列を用いた繰り返し処理が可能になる予定です。
256byteRAM pic.twitter.com/tkAz4BeqAv
— あぐちゃんさん❄️ (@Agchan_Luice) 2022年6月2日
・ALU
ALUはArithmetic and Logic Unit(算術論理演算装置)の略で、CPUが実際に計算を行う部分です。Wレジスタの値を使って計算します。結果はWRに格納されます。機能は以下7つです。
・NOT
・AND
・OR
・XOR
・右シフト
・左シフト
・加算
シフトや加算でオーバーフローが発生すると、WLの最下位ビットが立ち上がります。
・条件分岐
いわゆるif文を実現するための回路です。Wレジスタの値を使って評価します。使える条件は以下3つです。
・WRとWLの完全一致
・WRが正か負か(最上位ビットが立ち上がっているか否か)
・0以外か
評価した結果が真になると、前述したように命令カウンタに信号が送られて、一つ命令が飛ばされます。偽だった場合には何も起こりません。
・GPIO(General Purpose Input/Output)
GPIOは汎用入出力ポートです。指定した番号の入力ポートの値がWRに格納できます。また、指定した番号の出力ポートにWRの値を出力できます。これで外部との値のやり取りが可能になります。その気になればディスプレイとかキーボードを接続とかも可能ではあるのですが、動作がめちゃくちゃ遅いのは自明です。
気が付いたらオセロAIを書くことになっていた話
言い出しっぺが責任を持ちます
たぶん去年、授業でPythonを扱うことになり、そのうちにグループで何かしらを作って発表することになったことがありました。軽率に「オセロでもやらね?」って言ったら本当にオセロをやることになり、言い出しっぺである僕がAIを書くことになりました。最初は「まあちょろいだろ」って思ってたけど、軽く調べたら不安になるくらい難しいことばかり書いてありました。もうどうでもよくなったので(よくない)、とりあえず思い付きのコードで実装してみた。そしたらなんかそれなりに上手いこと動いたのでよかった(ほんまか?)。丸一日かけて書いたからしんどかった記憶がある。
自分の担当はAIだけだったので、手順決めや表示等はほかの人がやっています。他人のコードを晒すわけにはいかないので、自分が書いたAIとオセロのシステムのコードだけ公開します。ソースコードは以下2つ。
・example.py:使用例
・Reversi.py :システムとAI本体
ソースコード
example.py
import Reversi #インスタンス生成 a = Reversi.reversi(30,8,'black') while(1): #盤面表示 a.ShowBoardPic(a.table) #プレイヤーの手番 a.PlayerTurn() #ゲーム終了かどうか確認 if a.EndGameCheck():break #AIの手番 a.AITurn() #ゲーム終了かどうか確認 if a.EndGameCheck():break
Reversi.py
import sys import copy import random from PIL import Image,ImageDraw import re #角の追加点 edge_additional = 10 #敵の角の減点 enemy_edge_additional = -10 #辺の追加点 side_additional = 1 #角のL字の追加点 edgel_additional = 3 #外より位置マス内側の減点 outline_additional = -3 #未使用 sideenemy_additional = -3 class reversi: lv = 1 table = [[]] n = 8 side = 0 isVisiblePoint = False def __init__(self,level:int,size:int,AIside:str): #引数エラー処理群 if(level <= 1): print("Error:Make sure the level is 1 or higher.") sys.exit() if(size % 2 == 1): print("Error:Make sure size is even number.") sys.exit() if(4 > size): print("Error:Make sure minimum size is 6.") sys.exit() isTrueSide = False if(AIside == 'black'):isTrueSide = True if(AIside =='white'):isTrueSide = True if isTrueSide == False: print("Error:Make sure AI side is \"black\" or \"white\".") #各情報出力 print("AI level is",level,"\nsize is",size,"×",size) #各値代入 self.n = size self.lv = level if(AIside=='white'):self.side = 1 if(AIside=='black'):self.side = -1 self.table = [[0] * size for i in range(size)] #テーブル初期化 白1 黒-1 for i in range(self.n): for j in range(self.n): if (i == size / 2 - 1)&(j == size / 2 - 1): self.table[i][j] = 1 self.table[i][j+1] = -1 self.table[i+1][j] = -1 self.table[i+1][j+1] = 1 def VisiblePoint(self,a:bool):self.isVisiblePoint = a #盤面の様子をjpg出力 def GenerateBoardPic(self,name:str,board:list): im = Image.new('RGB', (self.n * 100, self.n * 100), (0, 128, 0)) draw = ImageDraw.Draw(im) for i in range(self.n - 1): draw.line(((i + 1) * 100,0,(i + 1) * 100,self.n * 100),width = 3,fill=(0,0,0)) for i in range(self.n - 1): draw.line((0,(i + 1) * 100,self.n * 100,(i + 1) * 100),width = 3,fill=(0,0,0)) for i in range(self.n): for j in range(self.n): if(board[i][j]==1): draw.ellipse((i * 100 + 5,j * 100 + 5,i * 100 + 95,j * 100 + 95),width = 3,fill=(255,255,255),outline=(0,0,0)) if(board[i][j]==-1): draw.ellipse((i * 100 + 5,j * 100 + 5,i * 100 + 95,j * 100 + 95),width = 3,fill=(0,0,0),outline=(0,0,0)) for i in range(0,self.n): for j in range(0,self.n): draw.text((i * 100 + 40,j * 100 + 40),str((i,j)),fill=(128,0,0)) im.save(name) #盤面の様子を標準機能で表示 def ShowBoardPic(self): board = self.table im = Image.new('RGB', (self.n * 100, self.n * 100), (0, 128, 0)) draw = ImageDraw.Draw(im) for i in range(self.n - 1): draw.line(((i + 1) * 100,0,(i + 1) * 100,self.n * 100),width = 3,fill=(0,0,0)) for i in range(self.n - 1): draw.line((0,(i + 1) * 100,self.n * 100,(i + 1) * 100),width = 3,fill=(0,0,0)) for i in range(self.n): for j in range(self.n): if(board[i][j]==1): draw.ellipse((i * 100 + 5,j * 100 + 5,i * 100 + 95,j * 100 + 95),width = 3,fill=(255,255,255),outline=(0,0,0)) if(board[i][j]==-1): draw.ellipse((i * 100 + 5,j * 100 + 5,i * 100 + 95,j * 100 + 95),width = 3,fill=(0,0,0),outline=(0,0,0)) for i in range(0,self.n): for j in range(0,self.n): draw.text((i * 100 + 40,j * 100 + 40),str((i,j)),fill=(128,0,0)) im.show() def OneLinerFromAdditionalPoint(self,s:int,line:list): isToSearch = True if len(line) < 3:isToSearch = False if(len(line)!=1): if line[1] != s * (-1):isToSearch = False ans = 0 if isToSearch: #指定色基準に反転 for i in range(0,len(line)): line[i] *= s #追加点の計算 for i in range(1,len(line) - 1): #0以外,0の並びが見つかったら追加点0にしてやめる if (line[i]!=0)&(line[i + 1] == 0): ans = 0 break #-1,1の並びを探す ans += 1 if(line[i]==-1)&(line[i + 1] == 1): break #最後まで移動したら追加点0 if i==len(line) - 2: ans = 0 break return ans #配置可能な場所か確認(0) 返り値:(追加取得マス数,[方向]) def isTruePos(self,x:int,y:int,board:list,col:str): #引数確認 if(x > self.n - 1):return [0,None] if(y > self.n - 1):return [0,None] s = 0 if(col == 'white'):s = 1 if(col == 'black'):s = -1 if(s==0):return [0,None] #既におかれているなら0 if(board[x][y] != 0) : return [0,None] #総計用 AddPoint = 0 #方向 direction = [] #右方向 line = [] for i in range(x,self.n): line.append(board[i][y]) tmp = self.OneLinerFromAdditionalPoint(s,line) AddPoint += tmp if tmp !=0:direction.append('right') #左方向 line = [] for i in range(0,x + 1): line.append(board[x-i][y]) tmp = self.OneLinerFromAdditionalPoint(s,line) AddPoint += tmp if tmp !=0:direction.append('left') #上方向 line = [] for i in range(0,y + 1): line.append(board[x][y-i]) tmp = self.OneLinerFromAdditionalPoint(s,line) AddPoint += tmp if tmp !=0:direction.append('upper') #下方向 line = [] for i in range(y,self.n): line.append(board[x][i]) tmp = self.OneLinerFromAdditionalPoint(s,line) AddPoint += tmp if tmp !=0:direction.append('downer') #右斜め上方向 line = [] for i in range(0,self.n): if((x + i > self.n - 1)|(y - i < 0)):break line.append(board[x + i][y - i]) tmp = self.OneLinerFromAdditionalPoint(s,line) AddPoint += tmp if tmp !=0:direction.append('right_upper') #右斜め下方向 line = [] for i in range(0,self.n): if((x + i > self.n - 1)|(y + i > self.n - 1)):break line.append(board[x + i][y + i]) tmp = self.OneLinerFromAdditionalPoint(s,line) AddPoint += tmp if tmp !=0:direction.append('right_downer') #左斜め上方向 line = [] for i in range(0,self.n): if((x - i < 0)|(y - i < 0)):break line.append(board[x - i][y - i]) tmp = self.OneLinerFromAdditionalPoint(s,line) AddPoint += tmp if tmp !=0:direction.append('left_upper') #左斜め下方向 line = [] for i in range(0,self.n): if((x - i < 0)|(y + i > self.n - 1)):break line.append(board[x - i][y + i]) tmp = self.OneLinerFromAdditionalPoint(s,line) AddPoint += tmp if tmp !=0:direction.append('left_downer') return [AddPoint,direction] def TablePoint(self,x:int,y:int,board:list,col:str): t = 0 myside = 1 if col == 'white' else -1 enemyside = -myside #数 for i in range(0,self.n): for j in range(0,self.n): if board[i][j] == myside: t += 1 #辺 for k in range(0,self.n): if board[0][k] == myside: t += side_additional for k in range(0,self.n): if board[k][0] == myside: t += side_additional for k in range(0,self.n): if board[self.n - 1][k] == myside: t += side_additional for k in range(0,self.n): if board[k][self.n - 1] == myside: t += side_additional #角 if board[0][0] == myside:t += edge_additional if board[self.n - 1][0] == myside:t += edge_additional if board[0][self.n - 1] == myside:t += edge_additional if board[self.n - 1][self.n - 1] == myside:t += edge_additional #辺より1マス内側に置こうとする場合 if x == 1:t += outline_additional if y == 1:t += outline_additional if x == self.n - 2:t += outline_additional if y == self.n - 2:t += outline_additional #敵が角を取った場合 if board[0][0] == enemyside:t += enemy_edge_additional if board[self.n - 1][0] == enemyside:t += enemy_edge_additional if board[0][self.n - 1] == enemyside:t += enemy_edge_additional if board[self.n - 1][self.n - 1] == enemyside:t += enemy_edge_additional return t def MaxPoint(self,board:list,col:str): pos = [] index = 0 enemyside = -1 if col == 'white' else 1 myside = -enemyside maximum = [0,0,-999] for i in range(0,self.n): for j in range(0,self.n): t = self.isTruePos(i,j,board,col)[0] if t == 0:continue t = 0 for k in range(0,self.n): for l in range(0,self.n): if self.table[k][l] == myside:t += 1 #得点計算用盤面 tmpb = copy.deepcopy(board) self.BoardUpdate(i,j,col,tmpb) t += self.TablePoint(i,j,tmpb,col) pos.append([i,j,t]) if index == 0:maximum = [i,j,t] index += 1 for i in range(0,len(pos)): if(pos[i][2]>maximum[2]):maximum = pos[i] return maximum def FlipOnLine(self,line,col): s = 1 if col == 'white' else -1 for i in range(len(line)): #同色がきたら抜ける if line[i] == s:break #異色がきたら反転 if line[i] == -s:line[i] = s return line def BoardUpdate(self,x,y,col:str,board:list): p = self.isTruePos(x,y,board,col) if(p[0] < 1):return for s in p[1]: if s== 'right': line = [] for i in range(x,self.n): line.append(board[i][y]) line = self.FlipOnLine(line,col) for i in range(0,len(line)): board[x + i][y] = line[i] if s == 'left': line = [] for i in range(0,x): line.append(board[x - i][y]) line = self.FlipOnLine(line,col) for i in range(0,len(line)): board[x - i][y] = line[i] if s == 'upper': line = [] for i in range(0,y): line.append(board[x][y - i]) line = self.FlipOnLine(line,col) for i in range(0,len(line)): board[x][y - i] = line[i] if s == 'downer': line = [] for i in range(y,self.n): line.append(board[x][i]) line = self.FlipOnLine(line,col) for i in range(0,len(line)): board[x][y + i] = line[i] if s == 'right_upper': line = [] for i in range(0,self.n): if((x + i > self.n - 1)|(y - i < 0)):break line.append(board[x + i][y - i]) line = self.FlipOnLine(line,col) for i in range(0,len(line)): board[x + i][y - i] = line[i] if s == 'right_downer': line = [] for i in range(0,self.n): if((x + i > self.n - 1)|(y + i > self.n - 1)):break line.append(board[x + i][y + i]) line = self.FlipOnLine(line,col) for i in range(0,len(line)): board[x + i][y + i] = line[i] if s == 'left_upper': line = [] for i in range(0,self.n): if((x - i < 0)|(y - i < 0)):break line.append(board[x - i][y - i]) line = self.FlipOnLine(line,col) for i in range(0,len(line)): board[x - i][y - i] = line[i] if s == 'left_downer': line = [] for i in range(0,self.n): if((x - i < 0)|(y + i > self.n - 1)):break line.append(board[x - i][y + i]) line = self.FlipOnLine(line,col) for i in range(0,len(line)): board[x - i][y + i] = line[i] #指定地点に自色を置く board[x][y] = 1 if col=='white' else -1 def NextMaxPointPos(self,board:list,col:str): enemycol = 'black' if col == 'white' else 'white' pos = [] index = 0 #候補地に設置 for ai in range(0,self.n): for aj in range(0,self.n): atmp = copy.deepcopy(board) a = self.isTruePos(ai,aj,atmp,col)[0] if a == 0:continue self.BoardUpdate(ai,aj,col,atmp) #敵の最善手で進める epos = self.MaxPoint(atmp,enemycol) self.BoardUpdate(epos[0],epos[1],enemycol,atmp) #次の手の得点を算出 ptmp = self.MaxPoint(atmp,col) #得点と座標を格納 pos.append([ai,aj,ptmp[2] - epos[2]]) #得点で昇順ソート for i in range(0,len(pos) - 1): for j in range(i,len(pos)): if pos[i][2] < pos[j][2]: tmp = pos[i] pos[i] = pos[j] pos[j] = tmp #同得点のものからランダム選出 mp = pos[0][2] index = 0 for i in range(1,len(pos)): if mp != pos[i][2]: break index += 1 for i in range(0,index): a = random.randint(0,index - 1) b = random.randint(0,index - 1) tmp = pos[a] pos[a] = pos[b] pos[b] = tmp return (pos[0][0],pos[0][1]) def isAbleToPut(self,board:list,col:str): for i in range(0,self.n): for j in range(0,self.n): if self.isTruePos(i,j,board,col)[0] != 0:return True return False def ReadLvLoop(self,level,board:list,col:str): #一手先を出す pos = self.NextMaxPointPos(board,col) if self.isAbleToPut(board,col): #これ以上先を読めなければ、座標を返して再帰ループから抜ける return (pos[0],pos[1]) #一手先の盤面をつくる tmpb = copy.deepcopy(board) self.BoardUpdate(pos[0],pos[1],col,tmpb) if level == 1: #level - 1回繰り返したら次の手を返して再帰ループから抜ける return (pos[0],pos[1]) print(level) #一手先の盤面を現在の盤面、level - 1として再帰呼び出し return self.ReadLvLoop(level - 1,tmpb,col) #AIのターン def AITurn(self): self.EndGameCheck() mycol = 'white' if self.side == 1 else 'black' if self.isAbleToPut(self.table,mycol) == False: print('AI passed') return fs = self.ReadLvLoop(self.lv,self.table,mycol) self.BoardUpdate(fs[0],fs[1],mycol,self.table) print(fs) if self.isVisiblePoint == True: a = self.TablePoint(fs[0],fs[1],self.table,mycol) print(a) #プレイヤーのターン def PlayerTurn(self): self.EndGameCheck() mycol = 'white' if self.side == -1 else 'black' if self.isAbleToPut(self.table,mycol) == False: print('You passed') return x = 0 y = 0 while(1): print('x y') tmp = input() chk = re.search(r'\d\s\d',tmp) if chk == None: print("Invalid!") continue i = chk.group(0).split() x = int(i[0]) y = int(i[1]) a = self.isTruePos(x,y,self.table,mycol)[0] if a == 0: print("Invalid!") continue break self.BoardUpdate(x,y,mycol,self.table) if self.isVisiblePoint == True: a = self.TablePoint(x,y,self.table,mycol) print(a) #ゲームが終了したかどうか判定 def EndGameCheck(self): if (self.isAbleToPut(self.table,'white') == False) & (self.isAbleToPut(self.table,'black') == False): print('GAME OVER!') w = 0 b = 0 for i in range(0,self.n): for j in range(0,self.n): if self.table[i][j] == 1: w += 1 if self.table[i][j] == -1: b += 1 print('WHITE:',w,' BLACK:',b) return True return False
3,2\n とか打つと、左から4つ目、上から3つ目の位置に自分の色を置けます。
このまま動かすと、デバッグ用に用意した盤面の画像が別窓で表示されます。画像データだけを出力もできるので、tkinterとかで適当にGUIに表示するのがいいかもしれません。
仕組み
AIのカラクリとしては割と単純で、盤面の状態を点数化して、その点数が高くなるように指していきます。点数は、盤面に自分の色が増えるほど高くなり、また角や辺をたくさん取れるとより点数が高くなるようにしています。AIは、プレイヤーも常に点数が高くなるように指してくると想定して盤面を読み進めます。これをすべてのマスに対して行い、一定手数先まで読んだときに一番点数が高かったマスに指します。レベルの数値は、この先読みする手数となっています。
まとめ
クラスメイトを数人ボコボコにできたので非常に満足です。
2つのシステムで盤面を共有できるので、やろうと思えばプレイヤー vsプレイヤーとか、AI vs AIもできます。
推しの画像を集めてTLに持ってくるtwitterBOT Python3
めんどくさいが原動力
TLに推しの画像をかき集めるために絵師様のフォロー専門のアカウントを用意するのがめんどくさかったし、保存すればスマホのストレージも圧迫されるしで、普段使い用のアカウントのTLになんとかして自動で人気の絵ツイを持ってこれないかと考えました。なので、twitterのAPIをしばいて、人気の画像をRTしてついでにローカルに保存するBOTを作りました。やったぜ。プログラムはどこのご家庭にもあるラズパイサーバーで毎日24時間動いています。現在、@agctoolとして稼働しています。好きなVtuberの画像を中心に集めているので、TLに放流したい方は是非。
BOTに対してメンションにコマンドをつけてツイートすることで、設定を追加したり、検索および回収&放流を手動で開始することができます。
例によって内容はアレなコードです。よくわかんないエラーが起きてexceptに引っかかってもpassでガン無視しちゃってるので、いつ死んでもおかしくありません(稼働始めてから半年間まだ死んだことないけど)。コードの使用は自己責任で。コピペで動くかどうかは知りません。悪しからず。保存した画像を他所のSNSに垂れ流したり配布等はしないように(規約に引っかかります)。
ファイルは4つあります。すべて同じ階層に配置しています。
・key :各種APIキーを保存するファイル
・config.json:設定ファイル
・saved :画像を保存済みのtweetIDを保存するファイル(最初は空のファイルを用意してください)
・app.py :BOT本体
上記のファイルと同じ階層にあるdata_storageというフォルダに保存した画像が順次格納されていきます。
仕組みを知りたかったら頑張ってクソ雑コードを読んでください。
key
{ "consumer_key":"*************************", "consumer_secret":"************************************************", "access_token_key":"**************************************************", "access_token_secret":"*********************************************" }
config.json
{ "save_path": "data_storage/", "search": [ { "keyword": "**************", "min_faves": 810, "download": true }, { "keyword": "***********:", "min_faves": 114, "download": true }, //以下同形式のデータ { "keyword": "*********", "min_faves": 514, "download": "true" } ] }
saved
{ "data": [ { "id": *******************, "date": "2021-04-10" }, { "id": *******************, "date": "2021-04-10" }, //以下同形式のデータ { "id": *******************, "date": "2021-04-16" } ], "number": 11149 }
4/16日の時点で11149件の画像を保存しているらしい....
app.py
import tweepy import json import datetime import requests import threading import time #基本設定項目 #管理者のID OPID = ******************* OPNAME = '[使いたい人のアカウントの名前 例:Agchan_Luice]' BOTNAME = '[BOTにするアカウントの名前]' #APIキー読み込み kf = open('key',encoding='utf-8') keys = json.load(kf) kf.close() #設定ファイル読み込み cf = open('config.json',encoding='utf-8') cfg = json.load(cf) save_path = cfg['save_path'] cf.close() #保存済み画像IDリスト読み込み sf = open('saved',encoding='utf-8') saved = json.load(sf) sf.close() #API認証 auth = tweepy.OAuthHandler(keys['consumer_key'], keys['consumer_secret']) auth.set_access_token(keys['access_token_key'], keys['access_token_secret']) api = tweepy.API(auth) #画像収集及びリツイート def collect(): #日付取得 date = datetime.date.today().strftime("%Y-%m-%d") todaytime = datetime.datetime.strptime(date,"%Y-%m-%d") limit = todaytime - datetime.timedelta(days=7) #設定ファイルから読み込み for word in cfg['search']: que = word['keyword'] + ' -RT min_faves:' + str(word['min_faves']) + ' since:' + limit.strftime("%Y-%m-%d") #print(que) #検索 search_result = api.search(q = que) #写真を含むツイートのみ選択 for result in search_result: #保存したかどうか確認 isSavedTweet = False if 'media' in result.entities: for media in result.entities['media']: if media['type'] == 'photo': #保存済みでないツイートだけ選択 f = True #f = False for s in saved['data']: if result.id == s['id']: f = False if f: url = media['media_url_https'] #保存する if word['download'] == 'true': response = requests.get(url) img = response.content filename = save_path + date + '-' + str(saved['number']) + '.jpeg' with open(filename,"wb") as f: f.write(img) saved['number'] += 1 isSavedTweet = True if isSavedTweet: #セーブしたらリツイート try: api.retweet(result.id) except: pass d = {"id":result.id,"date":date} saved['data'].append(d) #一週間以前の古いデータを削除 saved['data'] = [s for s in saved['data'] if datetime.datetime.strptime(s['date'],"%Y-%m-%d") > limit] #書き込み wd = json.dumps(saved,indent=4) ow = open("saved","w",encoding='utf-8') ow.write(wd) ow.close() #管理者のツイートをストリーミング class cmdlistener(tweepy.StreamListener): def on_data(self, data): #ツイート保存いらん #global dir_idx tweet = json.loads(data) if not tweet['retweeted'] and 'RT @' not in tweet['text']: # retweetは取得しない #save_path = "%s/%s.json" % ( query_dir, tweet["id_str"] ) #f = open(save_path, "w") #json.dump(tweet, f) #f.close() if tweet['user']['id'] == OPID: cmdfunc(tweet) return True return True def on_error(self, status): #print( status ) pass def cmd_streaming(): que = ['@' + BOTNAME] l = cmdlistener() def start_stream(): while True : try: stream = tweepy.Stream(auth, l) stream.filter(track = que) except: pass #print( 'target query:', que ) start_stream() def reply(tweet,text:str): try: dst_tw_id = tweet['id'] except: pass try: api.update_status(status = '@' + tweet['user']['screen_name'] + "\n" + text,in_reply_to_status_id = str(dst_tw_id)) except: pass def cmdfunc(tweet): cmd = tweet['text'].split() #print(cmd) try: if cmd[1] == 'srwadd': try: q = cmd[2] f = int(cmd[3]) if type(q) != str: reply(tweet,'error:invalid type' + type(q)) return -1 if type(f) != int: reply(tweet,'error:invalid type' + type(f)) return -1 cft = open('config.json',"w",encoding='utf-8') tf = False if len(cmd) >= 5: if cmd[4] == 'false': tf = True if tf: cfg['search'].append({'keyword':q,'min_faves':f,'download':'false'}) else: cfg['search'].append({'keyword':q,'min_faves':f,'download':'true'}) cft.write(json.dumps(cfg,indent=4)) cft.close() reply(tweet,"設定を追加しました。") return 0 except: reply(tweet,"usage : srwadd \'queue\' \'min_faves\'") return -1 if cmd[1] == 'searchm': reply(tweet,"更新を実行します。") collect() if cmd[1] == 'force-shutdown': reply(tweet,"強制終了します。") threading.stop() except: reply(tweet,"unknwon error occurred.") #各スレッド設定 class once_h(threading.Thread): def __init__(self, thread_name): self.thread_name = str(thread_name) threading.Thread.__init__(self) def __str__(self): return self.thread_name def run(self): while True: try: collect() except: print('error?') #4時間に1回動作 time.sleep(14400) class cmd_getstream(threading.Thread): def __init__(self, thread_name): self.thread_name = str(thread_name) threading.Thread.__init__(self) def __str__(self): return self.thread_name def run(self): cmd_streaming() def main(): t1 = once_h(thread_name='time') t2 = cmd_getstream(thread_name='every') t1.start() t2.start() thread_list = [t1,t2] for thread in thread_list: thread.join() main()
keyファイルには各種APIキーを記入します。savedファイルの中身は勝手に管理されます。期間を決めて検索するので、その期間中にすでに保存した画像を重複して保存しないためにあるファイルです。
config.jsonの中に、所定の形式で設定を書き込むと、その設定項目すべての検索を4時間に一度行います(あんまり検索項目多いと途中で止まるかも)。"keyword"キーには、検索したい文字列(例:#xxアート)を入れます。downloadの項目は、trueにしておくとRTと画像の保存をします。falseにすると、RTのみを行って画像は保存しません。min_favesの項目には、検索に引っ掛ける最低いいね数を0以上の数値で設定します。怒られそう。申し訳なさすぎるけどこれがないとヒット数多すぎるので勘弁して......。config.jsonの設定は、コマンドで追加できます。
例:srwadd #xxxArt 1000 true #xxxArtのうちいいねが1000以上で検索、画像の保存を有効(trueをfalseにすると画像が保存されずRTのみ行われます)
TL上でのコマンドは、app.pyの上の方で設定してるOPIDと名前が一致しているユーザーの言うことしか聞きません。コマンドは増やそうと思えば増やせるけどめんどくさくて放置しています。今のままで十分当初ほしかった機能を果たしてくれています。
みなさんも任意のBOTを書いてタイムラインを豊かにしましょう。
フーリエ変換の実用例でたまに見るアレ
プログラミングを習ってからというもの、数々のゴミのようなソースコードを量産して遊んできたわけですが、それらのガラクタの中にはそれなりに気に入っている物もあったりするわけです。雑にコードを書いてポイ捨てするときは、だいたいpythonを使います。今回はまず一つ目。
フーリエ変換で絵を周波数へ
これはおそらく去年に作ったものです。なんでも周波数に分解したくなる病気に一時期かかっていたので、その時に作りました。ソースコードは2つ。
データを生成するプログラム
from ctypes import windll, Structure, c_long, byref import time from datetime import datetime import tkinter from PIL import Image, ImageDraw import copy import json datalist = [] posbuffer = [] print("Enter new data file name.") fname = input() class Application(tkinter.Frame): def __init__(self, master=None): super().__init__(master) self.master = master self.master.title('tkinter canvas trial') self.pack() self.create_widgets() self.setup() def create_widgets(self): self.vr = tkinter.IntVar() self.vr.set(1) self.write_radio = tkinter.Radiobutton(self, text='write', variable=self.vr, value=1, command=self.change_radio) self.write_radio.grid(row=0, column=0) self.erase_radio = tkinter.Radiobutton(self, text='erase', variable=self.vr, value=2, command=self.change_radio) self.erase_radio.grid(row=0, column=1) self.clear_button = tkinter.Button(self, text='clear all and output data.', command=self.clear_canvas) self.clear_button.grid(row=0, column=2) self.save_button = tkinter.Button(self, text='save', command=self.save_canvas) self.save_button.grid(row=0, column=3) self.test_canvas = tkinter.Canvas(self, bg='white', width=700, height=700) self.test_canvas.grid(row=1, column=0, columnspan=4) self.test_canvas.bind('<B1-Motion>', self.paint) self.test_canvas.bind('<ButtonRelease-1>', self.reset) def setup(self): self.old_x = None self.old_y = None self.color = 'black' self.eraser_on = False self.im = Image.new('RGB', (700, 700), 'white') self.draw = ImageDraw.Draw(self.im) def change_radio(self): if self.vr.get() == 1: self.eraser_on = False else: self.eraser_on = True def clear_canvas(self): self.test_canvas.delete(tkinter.ALL) f = open('data\\' + fname + '.json',"w") if f == None: print('Write failed.') exit() bf = {} i = 0 j = 0 for s in datalist: for p in s: bf[str(i) + ':' + str(j)] = p j += 1 j = 0 i += 1 bf['length'] = i - 1 output = json.dumps(bf) f.write(output) f.close() exit() def save_canvas(self): self.test_canvas.postscript(file='out.ps', colormode='color') def paint(self, event): if self.eraser_on: paint_color = 'white' else: paint_color = 'black' if self.old_x and self.old_y: self.test_canvas.create_line(self.old_x, self.old_y, event.x, event.y, width=5.0, fill=paint_color, capstyle=tkinter.ROUND, smooth=tkinter.TRUE, splinesteps=36) self.draw.line((self.old_x, self.old_y, event.x, event.y), fill=paint_color, width=5) self.old_x = event.x self.old_y = event.y posbuffer.append([self.old_x,self.old_y]) def reset(self, event): self.old_x, self.old_y = None, None tmp = copy.deepcopy(posbuffer) datalist.append(tmp) posbuffer.clear() root = tkinter.Tk() app = Application(master=root) app.mainloop()
tkinterとPIL等を使って、線を描いたらそれをソースコードと同じ階層のdataというフォルダにjson形式で保存するプログラムがこれ。起動すると、お絵描きをすることができ、上のボタンを押すとデータを保存できます。
ドットを打つタイミングで座標データを回収してるみたいです。どこかのサイトからプログラムをコピペして、改造して作った記憶があります。データの保存の仕方があまりにも雑なので、このようなコードはあんまりおすすめしません。(絵を描くやつはとても助かりました。コピペ元へ感謝を。)
フーリエ変換して周波数に分解、sinとcosで戻す
import json import copy import cmath as cm import numpy as np import tkinter as tk import matplotlib.pyplot as plt import matplotlib.animation as animation import matplotlib.patches as patches Picture_size = 700 def dft(data): #合ってんの?これ ret = [] n = len(data) for k in range(0,n): w = cm.exp(-1j * cm.pi * 2.0 * k / float(n)) s = 0 for i in range(0,n): s += data[i] * (w ** i) / n * 2 ret.append(s) return ret #json読み取り print('Enter read data name') c = input() try: f = open('data\\' + c + '.json','r') except: print('Failed to open the file.') exit() print("File loading.....") rawdata = f.read() f.close() data = json.loads(rawdata) pos = [] i = 0 j = 0 length = data['length'] breakflag = False print('Converting data.....') while True: while True: try: pos.append(data[str(i) + ':' + str(j)]) except: if(length < i):breakflag = True if breakflag:break i += 1 j = 0 break j += 1 if breakflag:break #変換 x_ = [] y_ = [] for s in pos: x,y = s[0] - Picture_size // 2,Picture_size // 2 - s[1] x_.append(x) y_.append(y) n = len(x_) #LPF lpf_rate = 1 lpx = [0] * (n - lpf_rate) lpy = [0] * (n - lpf_rate) for i in range(0,n - lpf_rate): for j in range(0,lpf_rate): lpx[i] += x_[i + j] lpy[i] += y_[i + j] lpx[i] /= lpf_rate lpy[i] /= lpf_rate print("LPF rate is",lpf_rate,".") N = n - lpf_rate print("Number of data is",N) print('Fourier transrating with DFT......') #DFTで変換 Fx = np.fft.fft(lpx) Fy = np.fft.fft(lpy) print("Fourier transrating was successfully.") print("Getting frequency list......") #周波数リストを取得 x_freq = [] y_freq = [] for i in range(0,N // 2 - 1): x_freq.append(i + 1) y_freq.append(i + 1) print("Data format") #有効な範囲だけにぎゅむっと Fx = Fx[1:Fx.size // 2] Fy = Fy[1:Fy.size // 2] print("Restorationing with cos and sin function....") #周波数から復元 div = 300 N = Fx.size x = [0] * div y = [0] * div #実部 * cos + |虚部| * sin sin_table = [] cos_table = [] resol = 1600 for i in range(0,resol): sin_table.append(cm.sin(2 * cm.pi * i / resol)) cos_table.append(cm.cos(2 * cm.pi * i / resol)) def fsin(t:int): if t > 0: return sin_table[int(t) % 1600] else: return -sin_table[int(abs(t)) % 1600] def fcos(t:int): return cos_table[int(abs(t)) % 1600] for i in range(0,N): for j in range(0,div): #x[j] += (Fx[i].real * cm.cos(x_freq[i] * 2 * cm.pi * (1 - j / div)) + int(Fx[i].imag) * cm.sin(x_freq[i] * 2 * cm.pi * (1 - j / div))) / N #y[j] += (Fy[i].real * cm.cos(y_freq[i] * 2 * cm.pi * (1 - j / div)) + int(Fy[i].imag) * cm.sin(y_freq[i] * 2 * cm.pi * (1 - j / div))) / N x[j] += (Fx[i].real * fcos(x_freq[i] * resol * (1 - j / div)) + (Fx[i].imag * fsin(x_freq[i] * resol * (1 - j / div)))) / N y[j] += (Fy[i].real * fcos(y_freq[i] * resol * (1 - j / div)) + (Fy[i].imag * fsin(y_freq[i] * resol * (1 - j / div)))) / N #描画 fig = plt.figure() ims = [] plt.axes().set_aspect('equal') for i in range(0,div): #グラフ描画 im = plt.plot(x[0:i],y[0:i],'b') ims.append(im) ani = animation.ArtistAnimation(fig,ims,interval = 50) plt.show() #plt.plot(x,y) #plt.show()
まず、データを読み込んだらx座標とy座標でそれぞれフーリエ変換を実施して、周波数成分に分解します。計算結果の実部がcos(偶関数)成分で、虚部がsin(奇関数)成分を構成しているので、それぞれ分けて計算した後に足して出力データにつなげていき、Plot上に出力しているようです。ふつうは各周波数のゲインを調べたいので、この複素数の絶対値を取るのが一般的です。リアルタイムの音声データをぶち込んで変換し、絶対値を取って周波数でプロットしてやると、高い音とか低い音とかで伸びる場所が違うバランみたいなメーターを再現できます(なんていうんだアレ)。
numpyのFFT機能を使っています(dftって表示してるけどおそらくデータ数をうまいこと調整してFFTしているはず)。フーリエ変換(FFT、DFT、STFTなど、特に音声解析)に特化したライブラリはどうやらたくさんあるようで、コピペだけでリアルタイム音声の変換ができるやつもあります。librosaとか結構よかったです(これのスペクトログラムを表示する機能を使って、特定の音のパターンに反応する女児向け玩具を騙すためにその周波数を解析して、マイコンとスピーカーを使って騙す遊びをしたけど動画取り損ねた)。最初にdft関数を定義していますが、めちゃくちゃ遅かったような気がします。結局numpy製に落ち着いたのでしょうか、当時のことはまるで覚えていません。でもちゃんと今でも動きます。すごい。(率直)
一筆書きだと元の絵がかなり再現できますが、一度でもマウスのクリック離したりすると座標に不連続な部分が発生するため、その部分が高周波成分がクソデカになるので、めちゃくちゃわかりやすく発振します(ギブス現象って言うらしい)。
書きたかったことは書いたのでこの辺で。
ロボコンで作ったやつ
作ったもの思い出そうの一つ目。
とりあえず、ロボコン部としての活動で作ったものを置いておこうと思ったけど、1年と2年の時に作ったものは写真が見つからないのでパス。あ、でも2年の冬に移動用のライブラリ作ったっけ。これは動画がありますね。
カチカチ言ってるのは当時のリレーモタドラです。
たのしい pic.twitter.com/CgClx47RCv
— あぐちゃんさん❄️ (@Agchan_Luice) 2019年12月27日
プログラムの仕組みは秘密です。卒業したら原理の解説だけはするかも。
次。
3年の時のロボコンは2020年度。
https://www.cemedine.co.jp/cemedine_reports/kousenrobocon2020-kyusyuokinawa.html
ここに載ってる久留米のマシンの両方のプログラムをそれぞれ一部書きました。
Aチームはパソコンのペンタブのペンの位置と書いてる/書いてないをマシンに送信するプログラムを、Bチームは首の制御とマシン全体の制御と回路を担当しました。
首の制御の動画ありました。確かarcsinあたりで近似して動かしてたはず。
#ロボコン#久留米高専
— あぐちゃんさん❄️ (@Agchan_Luice) 2020年11月2日
ロールとピッチは軸ごとにモーターがあるんじゃなくて背中から2つのサーボで動かしてたりします
(この動画だとヨー軸がドリフトしてますが) pic.twitter.com/xOGnMUutrJ
本編はロボコンの映像がyoutubeに公式から上がってるはずなのでそっち見てください。
表立ってロボコンでやったことといえばこのくらいでしょうか。
飽きたのでおわり。
はつかきこ
自分用の掃き溜めです。読んでもためにならないと思います。たぶん。
何やってきたか自分でもよくわかんなくなってきたので、物置き場として活用します。めんどくさいので適当にしか書きません。
これまで作ってきた物のうち、記憶と記録があるものを一通り掘り出したら、進行中のお遊びを置いていく予定。気分が乗ったらちまちま書きます。
これは飯テロです。