はじめに
昨年の今頃から2ヶ月間、PINNsを使った2次元円柱後流のシミュレーションに取り組んだ。この記事を最後に取り組んでいなかった。 今回、新たなpython開発環境を構築したので、その例題としてこのテーマに取り組むことにした、今回はClaudeをお供に。
コード生成
プロンプト
Claudeに次のプロンプトで、ソースコードを生成させた。
PINNで2次元円柱後流のシミュレーションを行うpytorchで動くコードを生成してください。
生成コード
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from tqdm import tqdm
# デバイスの設定
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# ニューラルネットワークの定義
class PINN(nn.Module):
def __init__(self, layers):
super(PINN, self).__init__()
self.layers = nn.ModuleList()
for i in range(len(layers) - 1):
self.layers.append(nn.Linear(layers[i], layers[i+1]))
def forward(self, x, y, t):
# 入力の正規化
inputs = torch.cat([x, y, t], dim=1)
for i in range(len(self.layers) - 1):
inputs = torch.tanh(self.layers[i](inputs))
# 最終層(活性化関数なし)
output = self.layers[-1](inputs)
# 速度と圧力を出力
u = output[:, 0:1] # x方向速度
v = output[:, 1:2] # y方向速度
p = output[:, 2:3] # 圧力
return u, v, p
# Navier-Stokes方程式の残差計算
class NavierStokesPINN:
def __init__(self, layers, Re=100):
self.model = PINN(layers).to(device)
self.Re = Re # レイノルズ数
self.nu = 1.0 / Re # 動粘性係数
# 最適化アルゴリズム
self.optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-3)
self.scheduler = torch.optim.lr_scheduler.StepLR(
self.optimizer, step_size=5000, gamma=0.9
)
self.loss_history = []
def net_NS(self, x, y, t):
"""Navier-Stokes方程式の残差を計算"""
x.requires_grad_(True)
y.requires_grad_(True)
t.requires_grad_(True)
u, v, p = self.model(x, y, t)
# 1階微分
u_x = torch.autograd.grad(u, x, grad_outputs=torch.ones_like(u),
create_graph=True)[0]
u_y = torch.autograd.grad(u, y, grad_outputs=torch.ones_like(u),
create_graph=True)[0]
u_t = torch.autograd.grad(u, t, grad_outputs=torch.ones_like(u),
create_graph=True)[0]
v_x = torch.autograd.grad(v, x, grad_outputs=torch.ones_like(v),
create_graph=True)[0]
v_y = torch.autograd.grad(v, y, grad_outputs=torch.ones_like(v),
create_graph=True)[0]
v_t = torch.autograd.grad(v, t, grad_outputs=torch.ones_like(v),
create_graph=True)[0]
p_x = torch.autograd.grad(p, x, grad_outputs=torch.ones_like(p),
create_graph=True)[0]
p_y = torch.autograd.grad(p, y, grad_outputs=torch.ones_like(p),
create_graph=True)[0]
# 2階微分
u_xx = torch.autograd.grad(u_x, x, grad_outputs=torch.ones_like(u_x),
create_graph=True)[0]
u_yy = torch.autograd.grad(u_y, y, grad_outputs=torch.ones_like(u_y),
create_graph=True)[0]
v_xx = torch.autograd.grad(v_x, x, grad_outputs=torch.ones_like(v_x),
create_graph=True)[0]
v_yy = torch.autograd.grad(v_y, y, grad_outputs=torch.ones_like(v_y),
create_graph=True)[0]
# Navier-Stokes方程式(運動量保存則)
f_u = u_t + u * u_x + v * u_y + p_x - self.nu * (u_xx + u_yy)
f_v = v_t + u * v_x + v * v_y + p_y - self.nu * (v_xx + v_yy)
# 連続の式(質量保存則)
f_cont = u_x + v_y
return f_u, f_v, f_cont, u, v, p
def loss_function(self, x_domain, y_domain, t_domain,
x_inlet, y_inlet, t_inlet,
x_outlet, y_outlet, t_outlet,
x_wall_top, y_wall_top, t_wall_top,
x_wall_bottom, y_wall_bottom, t_wall_bottom,
x_cylinder, y_cylinder, t_cylinder,
x_init, y_init, t_init):
"""損失関数の計算"""
# 領域内でのNavier-Stokes方程式の残差
f_u, f_v, f_cont, _, _, _ = self.net_NS(x_domain, y_domain, t_domain)
loss_pde = torch.mean(f_u**2) + torch.mean(f_v**2) + torch.mean(f_cont**2)
# 入口境界条件(一様流)
u_inlet, v_inlet, _ = self.model(x_inlet, y_inlet, t_inlet)
loss_inlet = torch.mean((u_inlet - 1.0)**2) + torch.mean(v_inlet**2)
# 出口境界条件(圧力ゼロ、速度の勾配ゼロに近似)
u_outlet, v_outlet, p_outlet = self.model(x_outlet, y_outlet, t_outlet)
loss_outlet = torch.mean(p_outlet**2)
# 上下壁境界条件(自由滑り)
u_wall_t, v_wall_t, _ = self.model(x_wall_top, y_wall_top, t_wall_top)
u_wall_b, v_wall_b, _ = self.model(x_wall_bottom, y_wall_bottom, t_wall_bottom)
loss_wall = torch.mean(v_wall_t**2) + torch.mean(v_wall_b**2)
# 円柱表面境界条件(無滑り条件)
u_cyl, v_cyl, _ = self.model(x_cylinder, y_cylinder, t_cylinder)
loss_cylinder = torch.mean(u_cyl**2) + torch.mean(v_cyl**2)
# 初期条件
u_init, v_init, _ = self.model(x_init, y_init, t_init)
loss_init = torch.mean((u_init - 1.0)**2) + torch.mean(v_init**2)
# 総損失
loss = loss_pde + 10.0 * (loss_inlet + loss_outlet + loss_wall +
loss_cylinder + loss_init)
return loss, loss_pde, loss_inlet, loss_outlet, loss_wall, loss_cylinder
def train(self, epochs, x_domain, y_domain, t_domain,
x_inlet, y_inlet, t_inlet,
x_outlet, y_outlet, t_outlet,
x_wall_top, y_wall_top, t_wall_top,
x_wall_bottom, y_wall_bottom, t_wall_bottom,
x_cylinder, y_cylinder, t_cylinder,
x_init, y_init, t_init):
"""モデルの訓練"""
self.model.train()
pbar = tqdm(range(epochs), desc="Training")
for epoch in pbar:
self.optimizer.zero_grad()
loss, loss_pde, loss_inlet, loss_outlet, loss_wall, loss_cylinder = \
self.loss_function(x_domain, y_domain, t_domain,
x_inlet, y_inlet, t_inlet,
x_outlet, y_outlet, t_outlet,
x_wall_top, y_wall_top, t_wall_top,
x_wall_bottom, y_wall_bottom, t_wall_bottom,
x_cylinder, y_cylinder, t_cylinder,
x_init, y_init, t_init)
loss.backward()
self.optimizer.step()
self.scheduler.step()
self.loss_history.append(loss.item())
if epoch % 100 == 0:
pbar.set_postfix({
'Loss': f'{loss.item():.4e}',
'PDE': f'{loss_pde.item():.4e}',
'BC': f'{(loss_inlet + loss_outlet + loss_wall + loss_cylinder).item():.4e}'
})
def predict(self, x, y, t):
"""予測"""
self.model.eval()
with torch.no_grad():
u, v, p = self.model(x, y, t)
return u, v, p
# サンプリング点の生成
def generate_training_data(N_domain=5000, N_boundary=500, N_initial=1000):
"""訓練データの生成"""
# 計算領域の定義
x_min, x_max = -2.0, 10.0
y_min, y_max = -2.0, 2.0
t_min, t_max = 0.0, 5.0
# 円柱のパラメータ
x_cyl, y_cyl = 0.0, 0.0
r_cyl = 0.5
# 領域内のランダムサンプリング(円柱内部を除く)
x_domain = []
y_domain = []
t_domain = []
while len(x_domain) < N_domain:
x = np.random.uniform(x_min, x_max, N_domain * 2)
y = np.random.uniform(y_min, y_max, N_domain * 2)
t = np.random.uniform(t_min, t_max, N_domain * 2)
# 円柱内部を除外
mask = (x - x_cyl)**2 + (y - y_cyl)**2 > r_cyl**2
x = x[mask]
y = y[mask]
t = t[mask]
x_domain.extend(x[:N_domain - len(x_domain)])
y_domain.extend(y[:N_domain - len(y_domain)])
t_domain.extend(t[:N_domain - len(t_domain)])
x_domain = torch.FloatTensor(x_domain).reshape(-1, 1).to(device)
y_domain = torch.FloatTensor(y_domain).reshape(-1, 1).to(device)
t_domain = torch.FloatTensor(t_domain).reshape(-1, 1).to(device)
# 入口境界(x = x_min)
x_inlet = torch.ones(N_boundary, 1).to(device) * x_min
y_inlet = torch.FloatTensor(np.random.uniform(y_min, y_max, N_boundary)).reshape(-1, 1).to(device)
t_inlet = torch.FloatTensor(np.random.uniform(t_min, t_max, N_boundary)).reshape(-1, 1).to(device)
# 出口境界(x = x_max)
x_outlet = torch.ones(N_boundary, 1).to(device) * x_max
y_outlet = torch.FloatTensor(np.random.uniform(y_min, y_max, N_boundary)).reshape(-1, 1).to(device)
t_outlet = torch.FloatTensor(np.random.uniform(t_min, t_max, N_boundary)).reshape(-1, 1).to(device)
# 上壁境界(y = y_max)
x_wall_top = torch.FloatTensor(np.random.uniform(x_min, x_max, N_boundary)).reshape(-1, 1).to(device)
y_wall_top = torch.ones(N_boundary, 1).to(device) * y_max
t_wall_top = torch.FloatTensor(np.random.uniform(t_min, t_max, N_boundary)).reshape(-1, 1).to(device)
# 下壁境界(y = y_min)
x_wall_bottom = torch.FloatTensor(np.random.uniform(x_min, x_max, N_boundary)).reshape(-1, 1).to(device)
y_wall_bottom = torch.ones(N_boundary, 1).to(device) * y_min
t_wall_bottom = torch.FloatTensor(np.random.uniform(t_min, t_max, N_boundary)).reshape(-1, 1).to(device)
# 円柱表面境界
theta = np.random.uniform(0, 2*np.pi, N_boundary)
x_cylinder = torch.FloatTensor(x_cyl + r_cyl * np.cos(theta)).reshape(-1, 1).to(device)
y_cylinder = torch.FloatTensor(y_cyl + r_cyl * np.sin(theta)).reshape(-1, 1).to(device)
t_cylinder = torch.FloatTensor(np.random.uniform(t_min, t_max, N_boundary)).reshape(-1, 1).to(device)
# 初期条件(t = 0)
x_init = []
y_init = []
while len(x_init) < N_initial:
x = np.random.uniform(x_min, x_max, N_initial * 2)
y = np.random.uniform(y_min, y_max, N_initial * 2)
mask = (x - x_cyl)**2 + (y - y_cyl)**2 > r_cyl**2
x = x[mask]
y = y[mask]
x_init.extend(x[:N_initial - len(x_init)])
y_init.extend(y[:N_initial - len(y_init)])
x_init = torch.FloatTensor(x_init).reshape(-1, 1).to(device)
y_init = torch.FloatTensor(y_init).reshape(-1, 1).to(device)
t_init = torch.zeros(N_initial, 1).to(device)
return (x_domain, y_domain, t_domain,
x_inlet, y_inlet, t_inlet,
x_outlet, y_outlet, t_outlet,
x_wall_top, y_wall_top, t_wall_top,
x_wall_bottom, y_wall_bottom, t_wall_bottom,
x_cylinder, y_cylinder, t_cylinder,
x_init, y_init, t_init)
# 可視化
def visualize_flow(pinn, t_value=2.0, save_path='flow_field.png'):
"""流れ場の可視化"""
x = np.linspace(-2, 10, 150)
y = np.linspace(-2, 2, 80)
X, Y = np.meshgrid(x, y)
x_flat = torch.FloatTensor(X.flatten()).reshape(-1, 1).to(device)
y_flat = torch.FloatTensor(Y.flatten()).reshape(-1, 1).to(device)
t_flat = torch.ones_like(x_flat) * t_value
u, v, p = pinn.predict(x_flat, y_flat, t_flat)
u = u.cpu().numpy().reshape(X.shape)
v = v.cpu().numpy().reshape(X.shape)
p = p.cpu().numpy().reshape(X.shape)
# 円柱内部をマスク
mask = (X - 0.0)**2 + (Y - 0.0)**2 < 0.5**2
u[mask] = np.nan
v[mask] = np.nan
p[mask] = np.nan
# 速度の大きさ
velocity_magnitude = np.sqrt(u**2 + v**2)
fig, axes = plt.subplots(2, 2, figsize=(16, 8))
# u速度
im1 = axes[0, 0].contourf(X, Y, u, levels=20, cmap='RdBu_r')
axes[0, 0].set_title(f'u-velocity at t={t_value}')
axes[0, 0].set_xlabel('x')
axes[0, 0].set_ylabel('y')
axes[0, 0].add_patch(plt.Circle((0, 0), 0.5, color='black'))
plt.colorbar(im1, ax=axes[0, 0])
# v速度
im2 = axes[0, 1].contourf(X, Y, v, levels=20, cmap='RdBu_r')
axes[0, 1].set_title(f'v-velocity at t={t_value}')
axes[0, 1].set_xlabel('x')
axes[0, 1].set_ylabel('y')
axes[0, 1].add_patch(plt.Circle((0, 0), 0.5, color='black'))
plt.colorbar(im2, ax=axes[0, 1])
# 圧力
im3 = axes[1, 0].contourf(X, Y, p, levels=20, cmap='RdBu_r')
axes[1, 0].set_title(f'Pressure at t={t_value}')
axes[1, 0].set_xlabel('x')
axes[1, 0].set_ylabel('y')
axes[1, 0].add_patch(plt.Circle((0, 0), 0.5, color='black'))
plt.colorbar(im3, ax=axes[1, 0])
# 速度の大きさとストリームライン
im4 = axes[1, 1].contourf(X, Y, velocity_magnitude, levels=20, cmap='viridis')
axes[1, 1].streamplot(X, Y, u, v, color='white', linewidth=0.5, density=1.5)
axes[1, 1].set_title(f'Velocity magnitude at t={t_value}')
axes[1, 1].set_xlabel('x')
axes[1, 1].set_ylabel('y')
axes[1, 1].add_patch(plt.Circle((0, 0), 0.5, color='black'))
plt.colorbar(im4, ax=axes[1, 1])
plt.tight_layout()
plt.savefig(save_path, dpi=150, bbox_inches='tight')
plt.show()
print(f"Flow field visualization saved to {save_path}")
def plot_loss_history(pinn, save_path='loss_history.png'):
"""損失関数の履歴をプロット"""
plt.figure(figsize=(10, 6))
plt.plot(pinn.loss_history)
plt.yscale('log')
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.title('Training Loss History')
plt.grid(True)
plt.savefig(save_path, dpi=150, bbox_inches='tight')
plt.show()
print(f"Loss history plot saved to {save_path}")
# メイン実行
if __name__ == "__main__":
# ハイパーパラメータ
layers = [3, 64, 64, 64, 64, 3] # [入力層, 隠れ層..., 出力層]
Re = 100 # レイノルズ数
epochs = 10000 # 訓練エポック数
print("=" * 60)
print("2D Cylinder Flow Simulation using PINN")
print("=" * 60)
print(f"Reynolds number: {Re}")
print(f"Network architecture: {layers}")
print(f"Training epochs: {epochs}")
print("=" * 60)
# PINNモデルの初期化
pinn = NavierStokesPINN(layers, Re=Re)
# 訓練データの生成
print("\nGenerating training data...")
training_data = generate_training_data(
N_domain=5000,
N_boundary=500,
N_initial=1000
)
# モデルの訓練
print("\nStarting training...")
pinn.train(epochs, *training_data)
# 損失関数の履歴をプロット
print("\nPlotting loss history...")
plot_loss_history(pinn, save_path='/workspace/outputs/loss_history.png')
# 流れ場の可視化(複数の時刻)
print("\nVisualizing flow fields...")
for t in [0.5, 1.0, 2.0, 3.0]:
visualize_flow(pinn, t_value=t,
save_path=f'/workspace/outputs/flow_field_t{t:.1f}.png')
# モデルの保存
torch.save(pinn.model.state_dict(), '/workspace/outputs/pinn_cylinder_model.pth')
print("\nModel saved to /workspace/outputs/pinn_cylinder_model.pth")
print("\n" + "=" * 60)
print("Simulation completed!")
print("=" * 60)
実行結果
誤差の履歴

可視化

まとめ
こんなにあっさりと結果が得られるとは驚きである。昨年、一生懸命取り組んでいたことが無駄に感じる。まあ、昨年の経験は勉強にはなったと思っているが。
作成されたコードをもう少し解析してみたいと思う。